ストリームにおけるバックプレッシャー
データ処理中に発生する一般的な問題として、バックプレッシャーがあります。これは、データ転送中にバッファの後ろにデータが蓄積することを指します。転送の受信側で複雑な処理が行われている場合、または何らかの理由で速度が遅い場合、着信ソースからのデータが詰まりのように蓄積する傾向があります。
この問題を解決するには、データが一方のソースからもう一方のソースへスムーズに流れるようにするための委任システムが必要です。さまざまなコミュニティが独自のプログラムでこの問題を解決しており、Unix パイプと TCP ソケットはその良い例であり、しばしばフロー制御と呼ばれています。Node.js では、ストリームがこのソリューションとして採用されています。
このガイドの目的は、バックプレッシャーとは何か、そして Node.js のソースコードでストリームがどのようにこれを解決するかについて詳しく説明することです。ガイドの後半では、ストリームを実装する際にアプリケーションコードの安全性を確保し、最適化するための推奨されるベストプラクティスを紹介します。
Node.js におけるbackpressure
、Buffer
、EventEmitters
の一般的な定義、およびStream
に関するある程度の経験があることを前提としています。これらのドキュメントを読んだことがない場合は、API ドキュメントを最初に確認することをお勧めします。これにより、このガイドを読む際の理解が深まります。
データ処理の問題点
コンピューターシステムでは、データはパイプ、ソケット、シグナルを介してあるプロセスから別のプロセスに転送されます。Node.js では、Stream
と呼ばれる同様のメカニズムがあります。ストリームは素晴らしいです!Node.js にとって非常に多くのことを行い、内部コードベースのほぼすべての部分がそのモジュールを利用しています。開発者として、あなたもそれらを使用することを強くお勧めします!
const readline = require('node:readline')
const rl = readline.createInterface({
output: process.stdout,
input: process.stdin,
})
rl.question('なぜストリームを使用するべきですか? ', answer => {
console.log(`おそらく${answer}でしょう、あるいは、それらが素晴らしいからかもしれません!`)
})
rl.close()
ストリームによって実装されたバックプレッシャーメカニズムが優れた最適化である理由の良い例は、Node.js のストリーム実装からの内部システムツールを比較することで示すことができます。
あるシナリオでは、大きなファイル(約-9GB)を取り、おなじみのzip(1)
ツールを使用して圧縮します。
zip The.Matrix.1080p.mkv
これは完了するのに数分かかりますが、別のシェルでは、別の圧縮ツールgzip(1)
をラップする Node.js のモジュールzlib
を使用するスクリプトを実行できます。
const gzip = require('node:zlib').createGzip()
const fs = require('node:fs')
const inp = fs.createReadStream('The.Matrix.1080p.mkv')
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz')
inp.pipe(gzip).pipe(out)
結果をテストするには、各圧縮されたファイルを開いてみてください。zip(1)
ツールで圧縮されたファイルは、ファイルが破損していることを通知しますが、Stream によって完了した圧縮はエラーなく解凍されます。
注意
この例では、.pipe()
を使用して、データソースを一方の端からもう一方の端に取得します。ただし、適切なエラーハンドラーは接続されていません。データチャンクが適切に受信されなかった場合、Readable ソースまたはgzip
ストリームは破棄されません。pump
は、パイプライン内のストリームのいずれかが失敗または閉じられた場合にそれらを適切に破棄するユーティリティツールであり、この場合必須です!
pump
は Node.js 8.x 以前でのみ必要です。Node.js 10.x 以降では、pump
の代わりにpipeline
が導入されています。これは、エラーを転送し、適切にクリーンアップし、パイプラインが完了したときにコールバックを提供するストリーム間のパイプを行うモジュールメソッドです。
pipeline
の使用例を以下に示します。
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// pipeline APIを使用して、一連のストリームを簡単にパイプし、パイプラインが完全に完了したときに通知を受け取ります。
// 潜在的に巨大なビデオファイルを効率的にgzipするためのパイプライン:
pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
err => {
if (err) {
console.error('Pipeline failed', err)
} else {
console.log('Pipeline succeeded')
}
}
)
stream/promises
モジュールを使用して、async / await
で pipeline を使用することもできます。
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
try {
await pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz')
)
console.log('Pipeline succeeded')
} catch (err) {
console.error('Pipeline failed', err)
}
}
データが多すぎる、速すぎる
Readable
ストリームがWritable
にデータを速すぎるほど速く渡してしまう場合があります。消費者が処理できる量をはるかに超えるほどです!
そうなると、消費者は後から消費するためにデータのチャンクをすべてキューに入れ始めます。書き込みキューはどんどん長くなり、そのため、プロセス全体が完了するまで、より多くのデータをメモリに保持する必要があります。
ディスクへの書き込みはディスクからの読み取りよりもはるかに遅いので、ファイルを圧縮してハードディスクに書き込もうとすると、書き込みディスクが読み取り速度に追いつけなくなるため、バックプレッシャーが発生します。
// 秘密裏にストリームは言っています:「おいおい!待ってくれ、これは多すぎる!」
// データは、書き込みが着信データフローに追いつこうとする際に、データバッファの読み取り側に蓄積され始めます。
inp.pipe(gzip).pipe(outputFile)
これが、バックプレッシャーメカニズムが重要な理由です。バックプレッシャーシステムが存在しない場合、プロセスはシステムのメモリを使い果たし、他のプロセスの速度を効果的に低下させ、完了するまでシステムの大部分を独占します。
これにより、いくつかの問題が発生します。
- 他のすべての現在のプロセスの速度低下
- 過剰に動作するガベージコレクタ
- メモリ枯渇
以下の例では、.write()
関数の戻り値を取り除き、それをtrue
に変更します。これにより、Node.js コアでのバックプレッシャーサポートを効果的に無効にします。'modified'
バイナリへの参照はすべて、return ret;
行がない node バイナリを実行し、代わりに置き換えられたreturn true;
を使用していることを意味します。
ガベージコレクションへの過剰な負荷
簡単なベンチマークを見てみましょう。上記の例と同じ例を使用して、いくつかの時間試行を行い、両方のバイナリのメジアン時間を得ました。
試行番号 | `node` バイナリ (ms) | 修正済み`node` バイナリ (ms)
=================================================================
1 | 56924 | 55011
2 | 52686 | 55869
3 | 59479 | 54043
4 | 54473 | 55229
5 | 52933 | 59723
=================================================================
平均時間: | 55299 | 55975
どちらも約 1 分かかりますので、ほとんど違いはありませんが、私たちの疑いが正しいかどうかを確認するために、もう少し詳しく見てみましょう。Linux ツールdtrace
を使用して、V8 ガベージコレクタで何が起こっているかを評価します。
GC(ガベージコレクタ)の測定時間は、ガベージコレクタによって行われた単一のスウィープの完全なサイクルの間隔を示しています。
おおよその時間 (ms) | GC (ms) | 修正済み GC (ms)
=================================================
0 | 0 | 0
1 | 0 | 0
40 | 0 | 2
170 | 3 | 1
300 | 3 | 1
* * *
* * *
* * *
39000 | 6 | 26
42000 | 6 | 21
47000 | 5 | 32
50000 | 8 | 28
54000 | 6 | 35
2 つのプロセスは同じように開始し、同じ速度で GC を動作させているように見えますが、適切に動作するバックプレッシャーシステムが配置されていると、数秒後には、データ転送の終了まで、4〜8 ミリ秒の一貫した間隔で GC の負荷が分散されることが明らかになります。
しかし、バックプレッシャーシステムがない場合、V8 ガベージコレクションが遅くなり始めます。通常のバイナリは 1 分間に約 75 回 GC を実行しますが、修正済みバイナリは 36 回しか実行しません。
これは、増大するメモリ使用量から蓄積される遅く漸進的な負債です。バックプレッシャーシステムがない状態でデータが転送されると、各チャンク転送ごとに多くのメモリが使用されます。
割り当てられるメモリが多くなるほど、GC は 1 回のスイープで処理する必要があるものが多くなります。スイープが大きくなるほど、GC は解放できるものを決定する必要があり、より大きなメモリ空間で切り離されたポインタをスキャンすると、より多くのコンピューティングパワーが消費されます。
メモリ枯渇
各バイナリのメモリ消費量を調べるため、/usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js
を使用して各プロセスを個別に計測しました。
通常のバイナリの場合の出力は以下の通りです。
Respecting the return value of .write()
=============================================
real 58.88
user 56.79
sys 8.79
87810048 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
19427 page reclaims
3134 page faults
0 swaps
5 block input operations
194 block output operations
0 messages sent
0 messages received
1 signals received
12 voluntary context switches
666037 involuntary context switches
仮想メモリによって占有された最大バイトサイズは約 87.81MB です。
次に、.write()
関数の戻り値を変更した場合、以下のようになります。
Without respecting the return value of .write():
==================================================
real 54.48
user 53.15
sys 7.43
1524965376 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
373617 page reclaims
3139 page faults
0 swaps
18 block input operations
199 block output operations
0 messages sent
0 messages received
1 signals received
25 voluntary context switches
629566 involuntary context switches
仮想メモリによって占有された最大バイトサイズは約 1.52GB です。
バックプレッシャーを委任するためのストリームがない場合、割り当てられるメモリ空間は桁違いに大きくなります。同じプロセス間で大きな差が生じます!
この実験は、Node.js のバックプレッシャーメカニズムがコンピューティングシステムにとっていかに最適化され、コスト効率が高いかを示しています。それでは、その仕組みを詳しく見ていきましょう!
バックプレッシャーはどのようにこれらの問題を解決するのか?
データをあるプロセスから別のプロセスに転送する様々な関数があります。Node.js では、.pipe()
という内部組み込み関数があります。他にも使用できるパッケージはたくさんあります!しかし、最終的に、このプロセスの基本レベルでは、データのソースとコンシューマという 2 つの別々のコンポーネントがあります。
ソースから.pipe()
が呼び出されると、データが転送されることをコンシューマに知らせます。pipe 関数は、イベントトリガーに適切なバックプレッシャークロージャを設定するのに役立ちます。
Node.js では、ソースはReadable
ストリームであり、コンシューマはWritable
ストリームです(これらは Duplex または Transform ストリームと交換できますが、このガイドでは範囲外です)。
バックプレッシャーがトリガーされる瞬間は、Writable
の.write()
関数の戻り値に正確に絞り込むことができます。この戻り値は、もちろんいくつかの条件によって決定されます。
データバッファがhighwaterMark
を超えた場合、または書き込みキューが現在ビジー状態の場合、.write()
はfalse
を返します。
false
の値が返されると、バックプレッシャーシステムが作動します。着信Readable
ストリームがデータの送信を一時停止し、コンシューマが再び準備できるまで待ちます。データバッファが空になると、'drain'
イベントが発行され、着信データフローが再開されます。
キューが完了すると、バックプレッシャーによりデータの送信が再開されます。使用されていたメモリの空間は解放され、次のデータのバッチの準備が整います。
これは、.pipe()
関数に対して、いつでも一定量のメモリを使用できることを効果的に意味します。メモリリークはなく、無限のバッファリングもなく、ガベージコレクタはメモリの 1 つの領域だけを処理する必要があります!
では、バックプレッシャーがそれほど重要なのに、(おそらく)聞いたことがないのはなぜでしょうか?答えは簡単です。Node.js はこれらすべてを自動的に行ってくれます。
それは素晴らしいことです!しかし、カスタムストリームの実装方法を理解しようとしているときには、それほど素晴らしいことではありません。
注意
ほとんどのマシンでは、バッファがいっぱいになったときに決定されるバイトサイズがあります(マシンによって異なります)。Node.js ではカスタムhighWaterMark
を設定できますが、一般的にデフォルトは 16kb(16384、または objectMode ストリームの場合は 16)に設定されています。その値を上げたい場合もありますが、注意して行ってください!
.pipe()
のライフサイクル
バックプレッシャーをより深く理解するために、Readable
ストリームがWritable
ストリームにパイプされる際のライフサイクルを示すフローチャートを以下に示します。
+===================+
x--> パイプ関数 +--> src.pipe(dest) |
x は.pipeメソッド中に |===================|
x 設定されます。 | イベントコールバック|
+===============+ x |-------------------|
| あなたのデータ | x データフローの外に | .on('close', cb) |
+=======+=======+ x 存在しますが、 | .on('data', cb) |
| x 重要なイベントと | .on('drain', cb) |
| x それぞれのコールバックを| .on('unpipe', cb) |
+---------v---------+ x アタッチします。 | .on('error', cb) |
| Readable ストリーム +----+ | .on('finish', cb) |
+-^-------^-------^-+ | | .on('end', cb) |
^ | ^ | +-------------------+
| | | |
| ^ | |
^ ^ ^ | +-------------------+ +=================+
^ | ^ +----> Writable ストリーム +---------> .write(chunk) |
| | | +-------------------+ +=======+=========+
| | | |
| ^ | +------------------v---------+
^ | +-> if (!chunk) | このチャンクは大きすぎますか? |
^ | | .end() を emit; | キューはビジーですか? |
| | +-> else +-------+----------------+---+
| ^ | .write() を emit; | |
| ^ ^ +--v---+ +---v---+
| | ^-----------------------------------< いいえ | | はい |
^ | +------+ +---v---+
^ | |
| ^ .pause() を emit; +=================+ |
| ^---------------^-----------------------+ false を返す; <-----+---+
| +=================+ |
| |
^ キューが空の場合 +============+ |
^------------^-----------------------< バッファリング | |
| |============| |
+> .drain() を emit; | ^バッファ^ | |
+> .resume() を emit; +------------+ |
| ^バッファ^ | |
+------------+ キューにチャンクを追加 |
| <---^---------------------<
+============+
注意
いくつかのストリームをチェーンしてデータを操作するパイプラインを設定する場合は、Transform ストリームを実装する可能性が高いです。
この場合、Readable
ストリームからの出力はTransform
に入り、Writable
にパイプされます。
Readable.pipe(Transformable).pipe(Writable)
バックプレッシャーは自動的に適用されますが、Transform
ストリームの入力と出力のhighwaterMark
の両方を操作でき、バックプレッシャーシステムに影響することに注意してください。
バックプレッシャーに関するガイドライン
Node.js v0.10 以降、Stream クラスは、read()
またはwrite()
の動作をそれぞれのアンダースコアバージョンの関数(._read()
と._write()
)を使用して変更できるようになりました。
Readable ストリームの実装と Writable ストリームの実装に関するガイドラインが文書化されています。これらについては既にお読み済みであると仮定し、次のセクションではもう少し詳しく説明します。
カスタムストリーム実装時の遵守事項
ストリームの黄金律は、常にバックプレッシャーを尊重することです。ベストプラクティスとは矛盾のないプラクティスです。内部のバックプレッシャーサポートと競合する動作を回避することに注意していれば、良いプラクティスに従っていることを確信できます。
一般的に、
- 要求されていない限り
.push()
を呼び出さないこと。 write()
が false を返した後もwrite()
を呼び出さず、代わりに'drain'を待つこと。- ストリームは異なる Node.js バージョンや使用するライブラリ間で変化します。注意してテストを行うこと。
注意
3 点目に関して、ブラウザストリームを構築するための非常に便利なパッケージはreadable-stream
です。Rodd Vagg は、このライブラリの有用性について優れたブログ記事を書いています。簡単に言うと、Readable ストリームに対して一種の自動的なグレースフルデグレーションを提供し、古いバージョンのブラウザと Node.js をサポートします。
Readable ストリームに固有のルール
これまで、.write()
がバックプレッシャーにどのように影響するかを見てきました。Writable ストリームに重点を置いてきました。Node.js の機能により、データは技術的には Readable から Writable へ下流に流れています。しかし、データ、物質、またはエネルギーの伝送において観察できる通り、発信元は宛先と同じくらい重要であり、Readable ストリームはバックプレッシャーの処理において不可欠です。
これらのプロセスはどちらも効果的に通信するために互いに依存しています。Readable が Writable ストリームがデータの送信停止を要求したことを無視すると、.write()
の戻り値が正しくない場合と同じくらい問題になる可能性があります。
したがって、.write()
の戻り値を尊重するだけでなく、._read()
メソッドで使用される.push()
の戻り値も尊重する必要があります。.push()
が false を返した場合、ストリームはソースからの読み取りを停止します。そうでなければ、一時停止せずに継続します。
以下は.push()
を使用した悪いプラクティスの例です。
// これは、pushからの戻り値(宛先ストリームからのバックプレッシャーのシグナルである可能性があります!)を完全に無視するため問題があります。
class MyReadable extends Readable {
_read(size) {
let chunk
while (null == (chunk = getNextChunk())) {
this.push(chunk)
}
}
}
さらに、カスタムストリームの外から、バックプレッシャーを無視することには落とし穴があります。この良いプラクティスの反例では、アプリケーションのコードは利用可能なデータ('data'イベントで信号が送信される)を常に強制的に送信します。
// これは、Node.jsが設置したバックプレッシャーメカニズムを無視し、宛先ストリームの準備ができていなくても、無条件にデータをプッシュします。
readable.on('data', data => writable.write(data))
以下は、.push()
を Readable ストリームで使用した例です。
const { Readable } = require('node:stream')
// カスタムReadableストリームを作成する
const myReadableStream = new Readable({
objectMode: true,
read(size) {
// ストリームにいくつかのデータを送信する
this.push({ message: 'Hello, world!' })
this.push(null) // ストリームの終了を示す
},
})
// ストリームを使う
myReadableStream.on('data', chunk => {
console.log(chunk)
})
// 出力:
// { message: 'Hello, world!' }
書き込み可能ストリームに固有のルール
.write()
は、いくつかの条件に応じて true または false を返すことを思い出してください。幸いなことに、独自の書き込み可能ストリームを構築する場合、ストリーム状態マシンがコールバックを処理し、バックプレッシャーを処理し、データの流れを最適化するタイミングを決定します。ただし、書き込み可能ストリームを直接使用する場合、.write()
の戻り値を尊重し、これらの条件に注意深く対処する必要があります。
- 書き込みキューがビジー状態の場合、
.write()
は false を返します。 - データチャンクが大きすぎる場合、
.write()
は false を返します(制限は変数 highWaterMark で示されます)。
この例では、.push()
を使用して単一のオブジェクトをストリームにプッシュするカスタム読み込み可能ストリームを作成します。._read()
メソッドは、ストリームがデータの消費の準備ができているときに呼び出され、この場合、すぐにストリームにいくつかのデータプッシュし、null
をプッシュすることでストリームの終わりを示します。
const stream = require('stream')
class MyReadable extends stream.Readable {
constructor() {
super()
}
_read() {
const data = { message: 'Hello, world!' }
this.push(data)
this.push(null)
}
}
const readableStream = new MyReadable()
readableStream.pipe(process.stdout)
次に、'data' イベントをリッスンし、ストリームにプッシュされたデータの各チャンクをログに記録することで、ストリームを消費します。この場合、ストリームに単一のデータチャンクのみをプッシュするため、ログメッセージは 1 つだけ表示されます。
書き込み可能ストリームに固有のルール
.write()
は、いくつかの条件に応じて true または false を返すことを思い出してください。幸いなことに、独自の書き込み可能ストリームを構築する場合、ストリーム状態マシンがコールバックを処理し、バックプレッシャーを処理し、データの流れを最適化するタイミングを決定します。
ただし、書き込み可能ストリームを直接使用する場合、.write()
の戻り値を尊重し、これらの条件に注意深く対処する必要があります。
- 書き込みキューがビジー状態の場合、
.write()
は false を返します。 - データチャンクが大きすぎる場合、
.write()
は false を返します(制限は変数 highWaterMark で示されます)。
class MyWritable extends Writable {
// This writable is invalid because of the async nature of JavaScript callbacks.
// Without a return statement for each callback prior to the last,
// there is a great chance multiple callbacks will be called.
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) callback()
else if (chunk.toString().indexOf('b') >= 0) callback()
callback()
}
}
._writev()
を実装する際にも注意すべき点があります。この関数は .cork()
と連携していますが、記述時によくある間違いがあります。
// Using .uncork() twice here makes two calls on the C++ layer, rendering the
// cork/uncork technique useless.
ws.cork()
ws.write('hello ')
ws.write('world ')
ws.uncork()
ws.cork()
ws.write('from ')
ws.write('Matteo')
ws.uncork()
// The correct way to write this is to utilize process.nextTick(), which fires
// on the next event loop.
ws.cork()
ws.write('hello ')
ws.write('world ')
process.nextTick(doUncork, ws)
ws.cork()
ws.write('from ')
ws.write('Matteo')
process.nextTick(doUncork, ws)
// As a global function.
function doUncork(stream) {
stream.uncork()
}
.cork()
は何度でも呼び出すことができますが、再び流れるようにするには、同じ回数だけ .uncork()
を呼び出すことに注意する必要があります。
まとめ
Stream は Node.js で頻繁に使用されるモジュールです。内部構造にとって重要であり、開発者にとって Node.js モジュールエコシステム全体を拡張および接続するために重要です。
これで、バックプレッシャーを考慮して独自の Writable
ストリームと Readable
ストリームのトラブルシューティングを行い、安全にコーディングし、同僚や友人と知識を共有できるようになることを願っています。
Node.js を使用してアプリケーションを構築する際にストリーミング機能を向上させ、解放するために、他の API 関数に関する Stream
の詳細を必ず読んでください。