ストリームにおけるバックプレッシャー
データ処理中に発生する一般的な問題としてバックプレッシャーがあり、データ転送中のバッファの後ろでのデータの蓄積を表します。転送の受信側が複雑な操作を行っている場合、または何らかの理由で速度が遅い場合、入力ソースからのデータが詰まりのように蓄積する傾向があります。
この問題を解決するには、あるソースから別のソースへのデータのスムーズな流れを保証するための委任システムが必要です。さまざまなコミュニティがプログラムに固有の方法でこの問題を解決しており、UnixパイプとTCPソケットが良い例であり、フロー制御と呼ばれることがよくあります。Node.jsでは、ストリームが採用されたソリューションです。
このガイドの目的は、バックプレッシャーが何かをさらに詳しく説明し、Node.jsのソースコードでストリームがどのように対処しているかを正確に説明することです。ガイドの後半では、ストリームを実装する際にアプリケーションのコードが安全かつ最適化されていることを保証するための推奨されるベストプラクティスを紹介します。
Node.jsにおけるバックプレッシャー
、Buffer
、およびEventEmitter
の一般的な定義、およびStream
の使用経験がある程度あることを前提としています。これらのドキュメントをまだ読んでいない場合は、まずAPIドキュメントを見てみることをお勧めします。このガイドを読む際に理解を深めるのに役立ちます。
データ処理の問題点
コンピュータシステムでは、データはパイプ、ソケット、およびシグナルを介してあるプロセスから別のプロセスに転送されます。Node.jsでは、Stream
と呼ばれる同様のメカニズムが見られます。ストリームは素晴らしいです!Node.jsにとって非常に多くのことを行い、内部コードベースのほぼすべての部分がそのモジュールを利用しています。開発者として、あなたもそれらを使用することを強くお勧めします!
const readline = require('node:readline');
const rl = readline.createInterface({
output: process.stdout,
input: process.stdin,
});
rl.question('ストリームを使用する理由は何ですか? ', answer => {
console.log(`多分それは${answer}です、多分それはそれらが素晴らしいからでしょう!`);
});
rl.close();
ストリームを介して実装されるバックプレッシャーメカニズムが優れた最適化である理由の良い例は、Node.jsのStream実装からの内部システムツールを比較することで実証できます。
あるシナリオでは、大きなファイル(約-9 GB)を取得し、使い慣れたzip(1)
ツールを使用して圧縮します。
zip The.Matrix.1080p.mkv
完了までに数分かかりますが、別のシェルでは、Node.jsのモジュールzlib
を使用するスクリプトを実行できます。これは、別の圧縮ツールgzip(1)
をラップします。
const gzip = require('node:zlib').createGzip();
const fs = require('node:fs');
const inp = fs.createReadStream('The.Matrix.1080p.mkv');
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz');
inp.pipe(gzip).pipe(out);
結果をテストするには、各圧縮ファイルを開いてみてください。zip(1)
ツールで圧縮されたファイルは、ファイルが破損していることを通知しますが、Streamによって完了した圧縮はエラーなしで解凍されます。
注
この例では、.pipe()
を使用して、データソースを一方の端から他方の端に取得します。ただし、適切なエラーハンドラーが添付されていないことに注意してください。データのチャンクが適切に受信されなかった場合、Readableソースまたはgzip
ストリームは破棄されません。pump
は、パイプライン内のストリームのいずれかが失敗または閉じられた場合に、すべてのストリームを適切に破棄するユーティリティツールであり、この場合は必須です!
pump
はNode.js 8.x以前でのみ必要です。Node.js 10.x以降のバージョンでは、pipeline
がpump
の代わりとして導入されています。これは、エラーを転送し、パイプラインが完了したときに適切にクリーンアップしてコールバックを提供するストリーム間をパイプするモジュールメソッドです。
pipelineの使用例を次に示します。
const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');
// パイプラインAPIを使用して、一連のストリームを簡単にパイプで接続し、
// パイプラインが完全に完了したときに通知を受け取ります。
// 効率的に巨大なビデオファイルをgzip圧縮するパイプライン:
pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
err => {
if (err) {
console.error('パイプラインが失敗しました', err);
} else {
console.log('パイプラインが成功しました');
}
}
);
また、stream/promises
モジュールを使用して、async / await
でpipelineを使用することもできます。
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
try {
await pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz')
);
console.log('パイプラインが成功しました');
} catch (err) {
console.error('パイプラインが失敗しました', err);
}
}
データが多すぎる、速すぎる
Readable
ストリームが Writable
に対して、消費者が処理できる量よりもはるかに速くデータを供給する場合があります。
その場合、消費者は後で消費するためにデータのチャンクをすべてキューに入れ始めます。書き込みキューはますます長くなり、このため、プロセス全体が完了するまでより多くのデータをメモリに保持する必要があります。
ディスクへの書き込みは、ディスクからの読み取りよりもはるかに遅いため、ファイルを圧縮してハードディスクに書き込もうとすると、書き込みディスクが読み取りの速度に追いつけないため、背圧が発生します。
// ストリームは密かにこう言っています: "待って、待って! ちょっと待って、これは多すぎる!"
// データは、書き込みが入力データフローに追いつこうとするにつれて、データバッファの読み取り側で蓄積し始めます。
inp.pipe(gzip).pipe(outputFile);
これが、背圧メカニズムが重要な理由です。背圧システムが存在しない場合、プロセスはシステムのメモリを使い果たし、他のプロセスを効果的に遅らせ、完了するまでシステムのかなりの部分を独占します。
これは、次のようないくつかの結果をもたらします。
- 他のすべての現在のプロセスを遅らせる
- 非常に過労なガベージコレクター
- メモリ枯渇
次の例では、.write()
関数の戻り値を取り除き、true
に変更します。これにより、Node.js コアの背圧サポートが効果的に無効になります。 'modified'
バイナリへの参照では、return ret;
行なしで、代わりに置き換えられた return true;
でノードバイナリを実行することについて話しています。
ガベージコレクションへの過剰な負荷
簡単なベンチマークを見てみましょう。上記の例と同じ例を使用して、いくつかのタイムトライアルを実行し、両方のバイナリの中央値を取得しました。
trial (#) | `node` binary (ms) | modified `node` binary (ms)
=================================================================
1 | 56924 | 55011
2 | 52686 | 55869
3 | 59479 | 54043
4 | 54473 | 55229
5 | 52933 | 59723
=================================================================
average time: | 55299 | 55975
どちらも実行に約 1 分かかるため、違いはほとんどありませんが、疑念が正しいかどうかを確認するために詳しく見てみましょう。 Linux ツール dtrace
を使用して、V8 ガベージコレクターで何が起こっているかを評価します。
GC (ガベージコレクター) の測定時間は、ガベージコレクターによる単一スイープのフルサイクルの間隔を示します。
approx. time (ms) | GC (ms) | modified GC (ms)
=================================================
0 | 0 | 0
1 | 0 | 0
40 | 0 | 2
170 | 3 | 1
300 | 3 | 1
* * *
* * *
* * *
39000 | 6 | 26
42000 | 6 | 21
47000 | 5 | 32
50000 | 8 | 28
54000 | 6 | 35
2 つのプロセスは同じように開始し、GC を同じレートで動作させているように見えますが、適切に動作する背圧システムを導入すると、データの転送が終了するまで、GC の負荷を 4 ~ 8 ミリ秒の間隔で一貫して分散させることが明らかになります。
ただし、背圧システムが導入されていない場合、V8 ガベージコレクションは引き伸ばし始めます。通常のバイナリは 1 分間に約 75 回 GC を実行しますが、変更されたバイナリは 36 回しか実行しません。
これは、メモリ使用量の増加から蓄積されるゆっくりとした、段階的な債務です。データが転送されるにつれて、背圧システムが存在しない場合、チャンク転送ごとにより多くのメモリが使用されます。
割り当てられるメモリが多いほど、GC は 1 回のスイープで処理する必要がある量が多くなります。スイープが大きいほど、GC は解放できるものを決定する必要があり、より大きなメモリ空間で切り離されたポインターをスキャンするには、より多くの計算能力が必要になります。
メモリ枯渇
各バイナリのメモリ消費量を特定するために、各プロセスを /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js
で個別に計測しました。
これは通常のバイナリでの出力です。
.write() の戻り値を尊重する場合
=============================================
real 58.88
user 56.79
sys 8.79
87810048 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
19427 page reclaims
3134 page faults
0 swaps
5 block input operations
194 block output operations
0 messages sent
0 messages received
1 signals received
12 voluntary context switches
666037 involuntary context switches
仮想メモリによって占有される最大バイトサイズは約 87.81 mb であることがわかりました。
そして、.write()
関数の戻り値を変更すると、次のようになります。
.write() の戻り値を尊重しない場合:
==================================================
real 54.48
user 53.15
sys 7.43
1524965376 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
373617 page reclaims
3139 page faults
0 swaps
18 block input operations
199 block output operations
0 messages sent
0 messages received
1 signals received
25 voluntary context switches
629566 involuntary context switches
仮想メモリによって占有される最大バイトサイズは約 1.52 gb であることがわかりました。
バックプレッシャーを委譲するストリームがない場合、割り当てられるメモリ空間は桁違いに大きくなります。これは同じプロセス間で大きな違いです。
この実験は、Node.js のバックプレッシャーメカニズムがコンピューティングシステムにとってどれほど最適化され、費用対効果が高いかを示しています。次に、その仕組みを詳しく見ていきましょう。
バックプレッシャーはこれらの問題をどのように解決するのか?
あるプロセスから別のプロセスへデータを転送する方法は様々です。Node.jsには、.pipe()
という内部組み込み関数があります。他にも使えるパッケージはありますが、結局のところ、このプロセスの基本的なレベルでは、データのソースとコンシューマーという2つの異なるコンポーネントがあります。
.pipe()
がソースから呼び出されると、コンシューマーにデータが転送されることを知らせます。pipe関数は、イベントトリガーに対する適切なバックプレッシャークロージャーを設定するのに役立ちます。
Node.jsでは、ソースはReadable
ストリーム、コンシューマーはWritable
ストリームです(これらは両方ともDuplexまたはTransformストリームと交換可能ですが、このガイドの範囲外です)。
バックプレッシャーがトリガーされる瞬間は、Writable
の.write()
関数の戻り値に正確に絞り込むことができます。この戻り値は、もちろんいくつかの条件によって決定されます。
データバッファがhighwaterMark
を超えている場合、または書き込みキューが現在ビジー状態である場合、.write()
はfalse
を返します。
false
値が返されると、バックプレッシャーシステムが作動します。受信側のReadable
ストリームからのデータ送信を一時停止し、コンシューマーが再び準備できるまで待ちます。データバッファが空になると、'drain'
イベントが発行され、受信側のデータフローが再開されます。
キューが終了すると、バックプレッシャーによってデータが再び送信できるようになります。使用されていたメモリ領域は解放され、次のデータのバッチに備えます。
これにより、.pipe()
関数に対して、特定の時点で一定量のメモリを効果的に使用できるようになります。メモリリークや無限バッファリングはなくなり、ガベージコレクターはメモリ内の1つの領域だけを処理すればよくなります。
では、バックプレッシャーが非常に重要であるならば、なぜ(おそらく)聞いたことがないのでしょうか?その答えは簡単です。Node.jsがこれらすべてを自動的に行ってくれるからです。
それは素晴らしいことですが、カスタムストリームの実装方法を理解しようとしているときは、そうでもありません。
NOTE
ほとんどのマシンでは、バッファがフルになったと判断するバイトサイズがあります(これはマシンによって異なります)。Node.jsでは、カスタムのhighWaterMark
を設定できますが、通常、デフォルトは16kb(objectModeストリームの場合は16384、または16)に設定されています。その値を上げたい場合は、実行しても構いませんが、注意して行ってください!
.pipe()
のライフサイクル
バックプレッシャーをより良く理解するために、Readable
ストリームが Writable
ストリームに パイプ される際のライフサイクルのフローチャートを以下に示します。
+===================+
x--> パイプ処理関数 +--> src.pipe(dest) |
x は .pipe メソッド |===================|
x 中にセットアップされます。 | イベントコールバック |
+===============+ x |-------------------|
| あなたのデータ | x これらはデータフローの外 | .on('close', cb) |
+=======+=======+ x に存在しますが、重要なことに | .on('data', cb) |
| x イベントとそれぞれのコールバック | .on('drain', cb) |
| x をアタッチします。 | .on('unpipe', cb) |
+---------v---------+ x | .on('error', cb) |
| Readable ストリーム +----+ | .on('finish', cb) |
+-^-------^-------^-+ | | .on('end', cb) |
^ | ^ | +-------------------+
| | | |
| ^ | | +-------------------+ +=================+
^ ^ ^ +----> Writable ストリーム +---------> .write(chunk) |
^ | ^ +-------------------+ +=======+=========+
| | | |
| ^ | +------------------v---------+
^ | +-> if (!chunk) | このチャンクは大きすぎますか? |
^ | | emit .end(); | キューはビジーですか? |
| | +-> else +-------+----------------+---+
| ^ | emit .write(); | |
| ^ ^ +--v---+ +---v---+
^ | < いいえ | | はい |
^ | +------+ +---v---+
| ^ emit .pause(); +=================+ |
| ^---------------^-----------------------+ return false; <-----+---+
| +=================+ |
| |
^ キューが空の場合 +============+ |
^------------^-----------------------< バッファリング | |
| |============| |
+> emit .drain(); | ^Buffer^ | |
+> emit .resume(); +------------+ |
| ^Buffer^ | |
+------------+ チャンクをキューに追加 |
| <---^---------------------<
+============+
NOTE
データを操作するためにいくつかのストリームをチェーンするパイプラインをセットアップする場合は、Transform ストリームを実装することがほとんどでしょう。
この場合、Readable
ストリームからの出力は Transform
に入り、Writable
にパイプされます。
Readable.pipe(Transformable).pipe(Writable);
バックプレッシャーは自動的に適用されますが、Transform
ストリームの入力と出力の highwaterMark
の両方が操作される可能性があり、バックプレッシャーシステムに影響を与えることに注意してください。
バックプレッシャーのガイドライン
Node.js v0.10 以降、Stream クラスは、それぞれ対応する関数 (._read()
と ._write()
) のアンダースコアバージョンを使用することで、.read()
または .write()
の動作を変更する機能を提供しています。
Readable ストリームの実装と Writable ストリームの実装について文書化されたガイドラインがあります。これらに目を通したことを前提として、次のセクションでさらに詳しく説明します。
カスタムストリームを実装する際に従うべきルール
ストリームの黄金律は、常にバックプレッシャーを尊重することです。ベストプラクティスを構成するものは、矛盾のないプラクティスです。内部バックプレッシャーのサポートと矛盾する動作を避けるように注意すれば、良いプラクティスに従っていることを確認できます。
一般的に、
- 要求されない限り、決して
.push()
しないでください。 false
を返した後、決して.write()
を呼び出さず、代わりに 'drain' を待ってください。- ストリームは Node.js のバージョンや使用するライブラリによって異なります。注意してテストしてください。
NOTE
3 点目に関して、ブラウザストリームを構築するための非常に便利なパッケージは readable-stream
です。Rodd Vagg は、このライブラリの有用性について素晴らしいブログ記事を書いています。簡単に言うと、Readable ストリームに対してある種の自動的なグレースフルデグラデーションを提供し、古いバージョンのブラウザと Node.js をサポートします。
Readable ストリームに固有のルール
これまでのところ、.write()
がバックプレッシャーにどのように影響するかを見てきており、Writable ストリームに焦点を当ててきました。Node.js の機能により、データは技術的には Readable から Writable へとダウンストリームに流れています。ただし、データ、物質、またはエネルギーの伝送で観察できるように、ソースは宛先と同じくらい重要であり、Readable ストリームはバックプレッシャーの処理方法にとって不可欠です。
これらのプロセスは両方とも、効果的に通信するために互いに依存しています。Writable ストリームがデータの送信を停止するように要求したときに Readable が無視すると、.write()
の戻り値が正しくない場合と同じくらい問題が発生する可能性があります。
したがって、.write()
の戻り値を尊重するだけでなく、._read()
メソッドで使用される .push()
の戻り値も尊重する必要があります。.push()
が false
値を返すと、ストリームはソースからの読み取りを停止します。それ以外の場合は、一時停止することなく続行されます。
.push()
を使用した悪い例を次に示します。
// これは、プッシュからの戻り値を完全に無視するため問題があります
// これは、宛先ストリームからのバックプレッシャーのシグナルである可能性があります!
class MyReadable extends Readable {
_read(size) {
let chunk;
while (null == (chunk = getNextChunk())) {
this.push(chunk);
}
}
}
さらに、カスタムストリームの外部から、バックプレッシャーを無視することに落とし穴があります。この良い例の反例では、アプリケーションのコードは、データが利用可能になるたびに('data'
イベントによって通知されます)データを強制的に通過させます。
// これは、Node.js が設定したバックプレッシャーメカニズムを無視し、
// 宛先ストリームの準備ができているかどうかに関係なく、
// データを無条件にプッシュします。
readable.on('data', data => writable.write(data));
Readable ストリームで .push()
を使用する例を次に示します。
const { Readable } = require('node:stream');
// カスタムの Readable ストリームを作成します
const myReadableStream = new Readable({
objectMode: true,
read(size) {
// ストリームにデータをプッシュします
this.push({ message: 'Hello, world!' });
this.push(null); // ストリームの終わりをマークします
},
});
// ストリームを消費します
myReadableStream.on('data', chunk => {
console.log(chunk);
});
// 出力:
// { message: 'Hello, world!' }
Writableストリームに固有のルール
.write()
は、いくつかの条件に応じてtrueまたはfalseを返す可能性があることを思い出してください。幸運なことに、独自のWritableストリームを構築する場合、ストリーム状態マシンがコールバックを処理し、バックプレッシャーを処理してデータフローを最適化するタイミングを決定します。ただし、Writableを直接使用する場合は、.write()
の戻り値を尊重し、次の条件に注意する必要があります。
- 書き込みキューがビジーの場合、
.write()
はfalseを返します。 - データチャンクが大きすぎる場合、
.write()
はfalseを返します(制限は変数highWaterMarkで示されます)。
この例では、.push()
を使用して単一のオブジェクトをストリームにプッシュするカスタムReadableストリームを作成します。ストリームがデータを消費する準備ができると、._read()
メソッドが呼び出されます。この場合、すぐにいくつかのデータをストリームにプッシュし、null
をプッシュしてストリームの終わりを示します。
const stream = require('stream');
class MyReadable extends stream.Readable {
constructor() {
super();
}
_read() {
const data = { message: 'Hello, world!' };
this.push(data);
this.push(null);
}
}
const readableStream = new MyReadable();
readableStream.pipe(process.stdout);
次に、「data」イベントをリッスンし、ストリームにプッシュされる各データチャンクをログに記録して、ストリームを消費します。この場合、ストリームにプッシュするデータチャンクは1つだけなので、ログメッセージは1つしか表示されません。
Writableストリームに固有のルール
.write()
は、いくつかの条件に応じてtrueまたはfalseを返す可能性があることを思い出してください。幸運なことに、独自のWritableストリームを構築する場合、ストリーム状態マシンがコールバックを処理し、バックプレッシャーを処理してデータフローを最適化するタイミングを決定します。
ただし、Writableを直接使用する場合は、.write()
の戻り値を尊重し、次の条件に注意する必要があります。
- 書き込みキューがビジーの場合、
.write()
はfalseを返します。 - データチャンクが大きすぎる場合、
.write()
はfalseを返します(制限は変数highWaterMarkで示されます)。
class MyWritable extends Writable {
// このwritableは、JavaScriptコールバックの非同期的な性質のために無効です。
// 最後より前の各コールバックのreturnステートメントがない場合、
// 複数のコールバックが呼び出される可能性が高くなります。
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) callback();
else if (chunk.toString().indexOf('b') >= 0) callback();
callback();
}
}
._writev()
を実装する際に注意すべき点もあります。この関数は.cork()
と結合されていますが、記述するときによくある間違いがあります。
// ここで.uncork()を2回使用すると、C++レイヤーで2回呼び出されるため、
// cork/uncorkテクニックは役に立たなくなります。
ws.cork();
ws.write('hello ');
ws.write('world ');
ws.uncork();
ws.cork();
ws.write('from ');
ws.write('Matteo');
ws.uncork();
// これを正しく記述する方法は、次のイベントループで発生するprocess.nextTick()を利用することです。
ws.cork();
ws.write('hello ');
ws.write('world ');
process.nextTick(doUncork, ws);
ws.cork();
ws.write('from ');
ws.write('Matteo');
process.nextTick(doUncork, ws);
// グローバル関数として。
function doUncork(stream) {
stream.uncork();
}
.cork()
は何度でも呼び出すことができます。フローを再度有効にするには、.uncork()
を同じ回数だけ呼び出すように注意する必要があります。
結論
ストリームは、Node.js で頻繁に使用されるモジュールです。ストリームは内部構造にとって重要であり、開発者が Node.js モジュールエコシステム全体に拡張して接続するためにも重要です。
この記事を通じて、バックプレッシャーを考慮しながら、独自の Writable
および Readable
ストリームをトラブルシューティングし、安全にコーディングできるようになり、同僚や友人と知識を共有できるようになることを願っています。
Node.js でアプリケーションを構築する際に、ストリーミング機能を向上させ、最大限に活用するために、他の API 関数について Stream
をさらに調べてみてください。