ストリーム
ソースコード: lib/stream.js
ストリームは、Node.js でストリーミングデータを取り扱うための抽象インターフェースです。node:stream
モジュールは、ストリームインターフェースを実装するための API を提供します。
Node.js には多くのストリームオブジェクトが用意されています。例えば、HTTP サーバーへのリクエストやprocess.stdout
はどちらもストリームインスタンスです。
ストリームは、読み取り可能、書き込み可能、またはその両方である場合があります。すべてのストリームはEventEmitter
のインスタンスです。
node:stream
モジュールにアクセスするには:
const stream = require('node:stream')
node:stream
モジュールは、新しいタイプのストリームインスタンスを作成するのに役立ちます。ストリームを消費するためにnode:stream
モジュールを使用する必要は通常ありません。
このドキュメントの構成
このドキュメントには、2 つの主要なセクションと、メモのための 3 番目のセクションが含まれています。最初のセクションでは、アプリケーション内で既存のストリームを使用する方法を説明します。2 番目のセクションでは、新しいタイプのストリームを作成する方法を説明します。
ストリームの種類
Node.js には 4 つの基本的なストリームタイプがあります。
Writable
: データを書き込むことができるストリーム(例:fs.createWriteStream()
)。Readable
: データを読み取ることができるストリーム(例:fs.createReadStream()
)。Duplex
:Readable
とWritable
の両方であるストリーム(例:net.Socket
)。Transform
: 書き込みと読み取り時にデータを変更または変換できるDuplex
ストリーム(例:zlib.createDeflate()
)。
さらに、このモジュールには、ユーティリティ関数stream.duplexPair()
、stream.pipeline()
、stream.finished()
、stream.Readable.from()
、およびstream.addAbortSignal()
が含まれています。
Streams Promises API
追加日時: v15.0.0
stream/promises
API は、コールバックを使用するのではなくPromise
オブジェクトを返すストリームのための非同期ユーティリティ関数の代替セットを提供します。この API はrequire('node:stream/promises')
またはrequire('node:stream').promises
からアクセスできます。
stream.pipeline(source[, ...transforms], destination[, options])
stream.pipeline(streams[, options])
[履歴]
バージョン | 変更 |
---|---|
v18.0.0, v17.2.0, v16.14.0 | end オプションを追加。これはfalse に設定することで、ソースが終了したときにデスティネーションストリームを自動的に閉じないようにできます。 |
v15.0.0 | 追加日時: v15.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function>- 戻り値: <Promise> | <AsyncIterable>
...transforms
<Stream> | <Function>source
<AsyncIterable>- 戻り値: <Promise> | <AsyncIterable>
destination
<Stream> | <Function>source
<AsyncIterable>- 戻り値: <Promise> | <AsyncIterable>
options
<Object> パイプラインオプションsignal
<AbortSignal>end
<boolean> ソースストリームの終了時にデスティネーションストリームを終了します。トランスフォームストリームは、この値がfalse
の場合でも常に終了します。デフォルト:true
。
戻り値: <Promise> パイプラインが完了すると解決されます。
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
console.log('Pipeline succeeded.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('Pipeline succeeded.')
AbortSignal
を使用するには、最後の引数としてオプションオブジェクト内に渡します。シグナルが中断されると、AbortError
を使用して基になるパイプラインでdestroy
が呼び出されます。
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
const ac = new AbortController()
const signal = ac.signal
setImmediate(() => ac.abort())
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
signal,
})
}
run().catch(console.error) // AbortError
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
console.error(err) // AbortError
}
pipeline
API は非同期ジェネレータもサポートします。
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // `Buffer`ではなく文字列で処理します。
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
fs.createWriteStream('uppercase.txt')
)
console.log('Pipeline succeeded.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // `Buffer`ではなく文字列で処理します。
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
createWriteStream('uppercase.txt')
)
console.log('Pipeline succeeded.')
非同期ジェネレータに渡されたsignal
引数を処理することを忘れないでください。特に、非同期ジェネレータがパイプラインのソース(つまり、最初の引数)である場合、パイプラインは決して完了しません。
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline succeeded.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline succeeded.')
pipeline
API はコールバックバージョンも提供します。
stream.finished(stream[, options])
[履歴]
バージョン | 変更 |
---|---|
v19.5.0, v18.14.0 | ReadableStream とWritableStream のサポートを追加 |
v19.1.0, v18.13.0 | cleanup オプションを追加 |
v15.0.0 | 追加: v15.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> 読み込み可能および/または書き込み可能なストリーム/ウェブストリーム。options
<Object>error
<boolean> | <undefined>readable
<boolean> | <undefined>writable
<boolean> | <undefined>signal
<AbortSignal> | <undefined>cleanup
<boolean> | <undefined>true
の場合、プロミスが解決される前に、この関数によって登録されたリスナーを削除します。デフォルト:false
。
戻り値: <Promise> ストリームが読み込み不可または書き込み不可になったときに解決されます。
const { finished } = require('node:stream/promises')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('Stream is done reading.')
}
run().catch(console.error)
rs.resume() // ストリームを排出します。
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'
const rs = createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('Stream is done reading.')
}
run().catch(console.error)
rs.resume() // ストリームを排出します。
finished
API は、コールバックバージョンも提供します。
stream.finished()
は、返されたプロミスが解決または拒否された後も、ぶら下がっているイベントリスナー(特に'error'
、'end'
、'finish'
、'close'
)を残します。これは、(ストリームの実装が正しくないために)予期しない'error'
イベントが予期しないクラッシュを引き起こさないようにするためです。これが望ましくない動作である場合は、options.cleanup
をtrue
に設定する必要があります。
await finished(rs, { cleanup: true })
オブジェクトモード
Node.js API によって作成されたすべてのストリームは、文字列、<Buffer>、<TypedArray>、および<DataView>オブジェクトでのみ動作します。
Strings
とBuffers
は、ストリームで使用される最も一般的なタイプです。TypedArray
とDataView
を使用すると、Int32Array
やUint8Array
などの型でバイナリデータを取り扱うことができます。TypedArray
またはDataView
をストリームに書き込むと、Node.js は生のバイトを処理します。
ただし、ストリームの実装が他の種類の JavaScript 値(ストリーム内で特別な役割を果たすnull
を除く)で動作することも可能です。このようなストリームは、「オブジェクトモード」で動作すると見なされます。
ストリームインスタンスは、ストリームの作成時にobjectMode
オプションを使用してオブジェクトモードに切り替わります。既存のストリームをオブジェクトモードに切り替える試みは安全ではありません。
バッファリング
Writable
とReadable
ストリームの両方が、内部バッファにデータを格納します。
バッファされる可能性のあるデータの量は、ストリームのコンストラクタに渡されるhighWaterMark
オプションによって異なります。通常のストリームの場合、highWaterMark
オプションはバイトの総数を指定します。オブジェクトモードで動作するストリームの場合、highWaterMark
はオブジェクトの総数を指定します。文字列を操作する(ただしデコードしない)ストリームの場合、highWaterMark
は UTF-16 コード単位の総数を指定します。
データは、実装がstream.push(chunk)
を呼び出したときにReadable
ストリームにバッファされます。ストリームのコンシューマがstream.read()
を呼び出さない場合、データは消費されるまで内部キューに残ります。
内部読み込みバッファの合計サイズがhighWaterMark
で指定されたしきい値に達すると、現在バッファされているデータが消費されるまで(つまり、読み込みバッファを埋めるために使用される内部readable._read()
メソッドの呼び出しをストリームが停止するまで)、ストリームは基になるリソースからのデータの読み取りを一時的に停止します。
writable.write(chunk)
メソッドが繰り返し呼び出されると、データはWritable
ストリームにバッファされます。内部書き込みバッファの合計サイズがhighWaterMark
で設定されたしきい値を下回っている間は、writable.write()
への呼び出しはtrue
を返します。内部バッファのサイズがhighWaterMark
に達するかそれを超えると、false
が返されます。
stream
API、特にstream.pipe()
メソッドの重要な目標は、速度の異なるソースと宛先が使用可能なメモリを圧倒しないように、データのバッファリングを許容レベルに制限することです。
highWaterMark
オプションは制限ではなくしきい値です。ストリームがさらにデータの要求を停止する前にバッファするデータの量を指示します。一般的に厳格なメモリ制限を適用するものではありません。特定のストリーム実装はより厳格な制限を適用することを選択できますが、それはオプションです。
Duplex
とTransform
ストリームはどちらもReadable
とWritable
であるため、それぞれ読み取りと書き込みに使用される2 つの個別の内部バッファを保持し、各側が適切で効率的なデータの流れを維持しながら、互いに独立して動作できます。たとえば、net.Socket
インスタンスはDuplex
ストリームであり、そのReadable
側はソケットから受信したデータの消費を許可し、Writable
側はソケットへのデータの書き込みを許可します。データはソケットにデータが受信される速度よりも速くまたは遅く書き込まれる可能性があるため、各側は互いに独立して動作(およびバッファ)する必要があります。
内部バッファリングのメカニズムは内部実装の詳細であり、いつでも変更される可能性があります。ただし、特定の高度な実装では、writable.writableBuffer
またはreadable.readableBuffer
を使用して内部バッファを取得できます。これらの文書化されていないプロパティの使用は推奨されません。
ストリームコンシューマー用 API
どんなに単純な Node.js アプリケーションでも、ほとんどの場合何らかの形でストリームを使用します。Node.js アプリケーションで HTTP サーバーを実装する際のストリームの使用例を以下に示します。
const http = require('node:http')
const server = http.createServer((req, res) => {
// `req` は http.IncomingMessage であり、読み込み可能なストリームです。
// `res` は http.ServerResponse であり、書き込み可能なストリームです。
let body = ''
// データを utf8 文字列として取得します。
// エンコーディングが設定されていない場合、Bufferオブジェクトが受信されます。
req.setEncoding('utf8')
// 読み込み可能なストリームは、リスナーが追加されると 'data' イベントを発生させます。
req.on('data', chunk => {
body += chunk
})
// 'end' イベントは、本体全体が受信されたことを示します。
req.on('end', () => {
try {
const data = JSON.parse(body)
// ユーザーに何か興味深いものを書き戻します。
res.write(typeof data)
res.end()
} catch (er) {
// うわあ!不正なJSON!
res.statusCode = 400
return res.end(`error: ${er.message}`)
}
})
})
server.listen(1337)
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON
Writable
ストリーム(例では res
)は、ストリームにデータ書き込むために使用される write()
や end()
などのメソッドを公開しています。
Readable
ストリームは、EventEmitter
API を使用して、ストリームから読み取ることができるデータが利用可能になったときにアプリケーションコードに通知します。利用可能なデータは、複数の方法でストリームから読み取ることができます。
Writable
ストリームと Readable
ストリームの両方とも、EventEmitter
API をさまざまな方法で使用して、ストリームの現在の状態を伝達します。
Duplex
ストリームと Transform
ストリームは、両方とも Writable
ストリームと Readable
ストリームです。
ストリームにデータの書き込みまたはストリームからのデータの消費を行うアプリケーションは、ストリームインターフェースを直接実装する必要はなく、一般的に require('node:stream')
を呼び出す理由はありません。
新しいタイプのストリームを実装したい開発者は、ストリーム実装者向け API のセクションを参照してください。
書き込みストリーム
書き込みストリームは、データが書き込まれる宛先の抽象化です。
Writable
ストリームの例には、以下が含まれます。
- クライアント側の HTTP リクエスト
- サーバー側の HTTP レスポンス
- fs 書き込みストリーム
- zlib ストリーム
- crypto ストリーム
- TCP ソケット
- 子プロセスの stdin
process.stdout
、process.stderr
これらの例の一部は、実際にはDuplex
ストリームであり、Writable
インターフェースを実装しています。
すべてのWritable
ストリームは、stream.Writable
クラスによって定義されたインターフェースを実装します。
Writable
ストリームの具体的なインスタンスはさまざまな点で異なる場合がありますが、すべてのWritable
ストリームは、以下の例で示されているのと同じ基本的な使用方法のパターンに従います。
const myStream = getWritableStreamSomehow()
myStream.write('some data')
myStream.write('some more data')
myStream.end('done writing data')
クラス: stream.Writable
追加されたバージョン: v0.9.4
イベント: 'close'
[履歴]
バージョン | 変更 |
---|---|
v10.0.0 | emitClose オプションを追加して、'close' が破棄時に発行されるかどうかを指定します。 |
v0.9.4 | 追加されたバージョン: v0.9.4 |
'close'
イベントは、ストリームとその基礎となるリソース(ファイル記述子など)が閉じられたときに発生します。このイベントは、これ以上のイベントが発行されず、それ以上の計算が行われないことを示します。
Writable
ストリームは、emitClose
オプションを指定して作成された場合は常に'close'
イベントを発行します。
イベント: 'drain'
追加されたバージョン: v0.9.4
stream.write(chunk)
への呼び出しがfalse
を返す場合、ストリームへのデータの書き込みを再開するのに適したタイミングで'drain'
イベントが発生します。
// 指定された書き込みストリームに100万回データを書きます。
// バックプレッシャーに注意してください。
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000
write()
function write() {
let ok = true
do {
i--
if (i === 0) {
// 最後の回!
writer.write(data, encoding, callback)
} else {
// 継続すべきか、待つかを確認します。
// まだ完了していないため、コールバックは渡しません。
ok = writer.write(data, encoding)
}
} while (i > 0 && ok)
if (i > 0) {
// 途中で停止しなければなりませんでした!
// ドレインされたらさらに書き込みます。
writer.once('drain', write)
}
}
}
イベント: 'error'
追加バージョン: v0.9.4
'error'
イベントは、データの書き込みまたはパイプ処理中にエラーが発生した場合に発行されます。リスナーコールバックは呼び出されるときに単一の Error
引数を渡されます。
'error'
イベントが発行されると、ストリームは閉じられます。ただし、ストリームの作成時に autoDestroy
オプションが false
に設定されている場合は例外です。
'error'
以降は、'close'
以外のイベント('error'
イベントを含む)は発行されません はずです。
イベント: 'finish'
追加バージョン: v0.9.4
'finish'
イベントは、stream.end()
メソッドが呼び出され、すべてのデータが基になるシステムにフラッシュされた後に発行されます。
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`)
}
writer.on('finish', () => {
console.log('All writes are now complete.')
})
writer.end('This is the end\n')
イベント: 'pipe'
追加バージョン: v0.9.4
src
<stream.Readable> この書き込み可能ストリームにパイプしているソースストリーム
'pipe'
イベントは、読み込み可能ストリームで stream.pipe()
メソッドが呼び出され、この書き込み可能ストリームがその宛先のセットに追加されたときに発行されます。
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
console.log('Something is piping into the writer.')
assert.equal(src, reader)
})
reader.pipe(writer)
イベント: 'unpipe'
追加バージョン: v0.9.4
src
<stream.Readable> この書き込み可能ストリームから アンパイプ されたソースストリーム
'unpipe'
イベントは、Readable
ストリームで stream.unpipe()
メソッドが呼び出され、この Writable
がその宛先のセットから削除されたときに発行されます。
これは、Readable
ストリームがこの Writable
ストリームにパイプされたときにエラーが発生した場合にも発行されます。
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
console.log('Something has stopped piping into the writer.')
assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()
追加されたバージョン: v0.11.2
writable.cork()
メソッドは、書き込まれたすべてのデータをメモリにバッファリングします。バッファされたデータは、stream.uncork()
またはstream.end()
メソッドが呼び出されたときにフラッシュされます。
writable.cork()
の主な目的は、複数の小さなチャンクが短時間に連続してストリームに書き込まれる状況に対応することです。writable.cork()
は、それらを基になる宛先にすぐに転送する代わりに、writable.uncork()
が呼び出されるまですべてのチャンクをバッファリングし、writable._writev()
が存在する場合は、それらをすべてwritable._writev()
に渡します。これにより、最初の小さなチャンクの処理を待っている間にデータがバッファリングされるヘッドオブラインブロッキングが発生するのを防ぎます。ただし、writable._writev()
を実装せずにwritable.cork()
を使用すると、スループットに悪影響を与える可能性があります。
こちらも参照してください:writable.uncork()
、writable._writev()
。
writable.destroy([error])
[履歴]
バージョン | 変更点 |
---|---|
v14.0.0 | すでに破棄されているストリームでは何もしない動作になります。 |
v8.0.0 | 追加されたバージョン: v8.0.0 |
ストリームを破棄します。オプションで'error'
イベントを発生させ、emitClose
がfalse
に設定されていない限り'close'
イベントを発生させます。この呼び出し後、書き込みストリームは終了し、その後のwrite()
またはend()
への呼び出しはERR_STREAM_DESTROYED
エラーになります。これは、ストリームを破壊する破壊的で即時的な方法です。以前のwrite()
への呼び出しはフラッシュされていない可能性があり、ERR_STREAM_DESTROYED
エラーをトリガーする可能性があります。データが閉じる前にフラッシュする必要がある場合、またはストリームを破棄する前に'drain'
イベントを待つ場合は、destroy
の代わりにend()
を使用してください。
const { Writable } = require('node:stream')
const myStream = new Writable()
const fooErr = new Error('foo error')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo error
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.on('error', function wontHappen() {})
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED
destroy()
が呼び出されると、それ以降の呼び出しは何もしなくなり、_destroy()
からのエラーを除いて、'error'
としてさらにエラーは発生しません。
実装者はこのメソッドをオーバーライドするのではなく、writable._destroy()
を実装する必要があります。
writable.closed
追加日時: v18.0.0
'close'
が emit された後、true
になります。
writable.destroyed
追加日時: v8.0.0
writable.destroy()
が呼び出された後、true
になります。
const { Writable } = require('node:stream')
const myStream = new Writable()
console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])
[履歴]
バージョン | 変更内容 |
---|---|
v22.0.0, v20.13.0 | chunk 引数にTypedArray またはDataView インスタンスを使用できるようになりました。 |
v15.0.0 | callback は'finish'の前に、またはエラー時に呼び出されるようになりました。 |
v14.0.0 | callback は'finish'または'error'が emit された場合に呼び出されるようになりました。 |
v10.0.0 | このメソッドはwritable への参照を返すようになりました。 |
v8.0.0 | chunk 引数にUint8Array インスタンスを使用できるようになりました。 |
v0.9.4 | 追加日時: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> 書き込むオプションデータ。オブジェクトモードでないストリームの場合、chunk
は<string>、<Buffer>、<TypedArray>、または<DataView>でなければなりません。オブジェクトモードのストリームの場合、chunk
はnull
以外の任意の JavaScript 値にすることができます。encoding
<string>chunk
が文字列の場合のエンコーディングcallback
<Function> ストリームが終了したときのコールバック。- 戻り値: <this>
writable.end()
メソッドを呼び出すと、Writable
にデータが書き込まれなくなることを示します。オプションのchunk
とencoding
引数を使用すると、ストリームを閉じる直前に、最終的な追加データチャンクを書き込むことができます。
stream.end()
を呼び出した後にstream.write()
メソッドを呼び出すと、エラーが発生します。
// 'hello, 'を書き込み、その後'world!'で終了します。
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hello, ')
file.end('world!')
// 今後の書き込みは許可されません!
writable.setDefaultEncoding(encoding)
[履歴]
バージョン | 変更 |
---|---|
v6.1.0 | このメソッドは、writable への参照を返すようになりました。 |
v0.11.15 | 追加: v0.11.15 |
writable.setDefaultEncoding()
メソッドは、Writable
ストリームのデフォルトのencoding
を設定します。
writable.uncork()
追加: v0.11.2
writable.uncork()
メソッドは、stream.cork()
が呼び出されてからバッファリングされたすべてのデータをフラッシュします。
writable.cork()
とwritable.uncork()
を使用してストリームへの書き込みのバッファリングを管理する場合は、process.nextTick()
を使用してwritable.uncork()
への呼び出しを遅延させます。これにより、特定の Node.js イベントループフェーズ内で発生するすべてのwritable.write()
呼び出しのバッチ処理が可能になります。
stream.cork()
stream.write('some ')
stream.write('data ')
process.nextTick(() => stream.uncork())
writable.cork()
メソッドがストリームに対して複数回呼び出された場合、バッファリングされたデータをフラッシュするには、同じ数のwritable.uncork()
呼び出しを行う必要があります。
stream.cork()
stream.write('some ')
stream.cork()
stream.write('data ')
process.nextTick(() => {
stream.uncork()
// データは、uncork()が2回目に呼び出されるまでフラッシュされません。
stream.uncork()
})
こちらも参照: writable.cork()
。
writable.writable
追加: v11.4.0
ストリームが破棄、エラー、または終了していないことを意味し、writable.write()
を呼び出すことが安全な場合はtrue
です。
writable.writableAborted
追加: v18.0.0, v16.17.0
'finish'
の発生前にストリームが破棄またはエラーになったかどうかを返します。
writable.writableEnded
追加時期: v12.9.0
writable.end()
が呼び出された後、true
になります。このプロパティはデータがフラッシュされたかどうかを示すものではなく、そのためには代わりに writable.writableFinished
を使用してください。
writable.writableCorked
追加時期: v13.2.0, v12.16.0
ストリームを完全にアンコルクするためにwritable.uncork()
を呼び出す必要がある回数。
writable.errored
追加時期: v18.0.0
ストリームがエラーで破棄された場合にエラーを返します。
writable.writableFinished
追加時期: v12.6.0
'finish'
イベントが emit される直前に true
に設定されます。
writable.writableHighWaterMark
追加時期: v9.3.0
この Writable
を作成時に渡された highWaterMark
の値を返します。
writable.writableLength
追加時期: v9.4.0
このプロパティには、書き込み準備ができたキュー内のバイト数(またはオブジェクト数)が含まれています。この値は、highWaterMark
の状態に関する自己検査データを提供します。
writable.writableNeedDrain
追加時期: v15.2.0, v14.17.0
ストリームのバッファがいっぱいになり、ストリームが 'drain'
を emit する場合、true
になります。
writable.writableObjectMode
追加日時: v12.3.0
指定されたWritable
ストリームのobjectMode
プロパティのゲッター。
writable[Symbol.asyncDispose]()
追加日時: v22.4.0, v20.16.0
AbortError
を用いて writable.destroy()
を呼び出し、ストリームが終了した時に解決される Promise を返します。
writable.write(chunk[, encoding][, callback])
[履歴]
バージョン | 変更 |
---|---|
v22.0.0, v20.13.0 | chunk 引数にTypedArray またはDataView インスタンスを使用できるようになりました。 |
v8.0.0 | chunk 引数にUint8Array インスタンスを使用できるようになりました。 |
v6.0.0 | オブジェクトモードでもchunk パラメータにnull を渡すと常に無効と見なされるようになりました。 |
v0.9.4 | 追加日時: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> 書き込むオプションデータ。オブジェクトモードで動作していないストリームの場合、chunk
は <string>、 <Buffer>、 <TypedArray>、または <DataView> でなければなりません。オブジェクトモードストリームの場合、chunk
はnull
以外の任意の JavaScript 値にすることができます。encoding
<string> | <null>chunk
が文字列の場合のエンコーディング。 デフォルト:'utf8'
callback
<Function> このチャンクのデータがフラッシュされたときのコールバック。- 戻り値: <boolean> ストリームが呼び出し元のコードに追加のデータの書き込みを続ける前に
'drain'
イベントの発生を待つことを希望する場合false
。それ以外の場合はtrue
。
writable.write()
メソッドはストリームにデータ書き込み、データが完全に処理された後、指定されたcallback
を呼び出します。エラーが発生した場合、callback
はエラーを最初の引数として呼び出されます。callback
は非同期的に、そして'error'
が emit される前に呼び出されます。
戻り値は、chunk
を受け入れた後、ストリーム作成時に設定されたhighWaterMark
よりも内部バッファが小さい場合にtrue
です。false
が返された場合、'drain'
イベントが emit されるまで、ストリームへのデータの書き込みのさらなる試行を停止する必要があります。
ストリームが排水されていない間、write()
への呼び出しはchunk
をバッファリングし、false を返します。現在バッファリングされているすべてのチャンクが排水されると(オペレーティングシステムによって配信のために受け入れられると)、'drain'
イベントが emit されます。write()
が false を返すようになったら、'drain'
イベントが emit されるまで、さらにチャンクを書き込まないでください。排水されていないストリームに対してwrite()
を呼び出すことは許可されていますが、Node.js は最大メモリ使用量が発生するまで、書き込まれたすべてのチャンクをバッファリングし、その時点で無条件に中止します。中止する前でも、メモリ使用量が多いと、ガベージコレクタのパフォーマンスが悪くなり、RSS が高くなります(メモリが不要になった後でも、通常はシステムに返却されません)。TCP ソケットは、リモートピアがデータを読み取らない場合、排水されない可能性があるため、排水されていないソケットに書き込むと、リモートで悪用可能な脆弱性につながる可能性があります。
ストリームが排水されていない間にデータの書き込みを行うことは、Transform
の場合に特に問題があります。なぜなら、Transform
ストリームは、パイプされるか、'data'
または'readable'
イベントハンドラが追加されるまで、デフォルトで一時停止されるからです。
書き込むデータをオンデマンドで生成または取得できる場合は、ロジックをReadable
にカプセル化し、stream.pipe()
を使用することをお勧めします。ただし、write()
の呼び出しが優先される場合は、'drain'
イベントを使用してバックプレッシャーを尊重し、メモリの問題を回避することができます。
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb)
} else {
process.nextTick(cb)
}
}
// cbが呼び出されるのを待つまで、他の書き込みは行わない。
write('hello', () => {
console.log('書き込みが完了しました。さらに書き込みを実行します。')
})
オブジェクトモードのWritable
ストリームは、常にencoding
引数を無視します。
読み込み可能ストリーム
読み込み可能ストリームは、データが消費される ソース の抽象化です。
Readable
ストリームの例:
- クライアント側の HTTP レスポンス
- サーバー側の HTTP リクエスト
- fs 読み込みストリーム
- zlib ストリーム
- crypto ストリーム
- TCP ソケット
- 子プロセスの stdout と stderr
process.stdin
すべてのReadable
ストリームは、stream.Readable
クラスによって定義されたインターフェースを実装します。
2 つの読み込みモード
Readable
ストリームは、事実上、フローモードと一時停止モードの 2 つのモードのいずれかで動作します。これらのモードは、オブジェクトモードとは別です。Readable
ストリームは、フローモードか一時停止モードかに関係なく、オブジェクトモードであるかどうかにかかわらず動作します。
- フローモードでは、データは基となるシステムから自動的に読み取られ、
EventEmitter
インターフェースを介したイベントを使用して、できるだけ早くアプリケーションに提供されます。 - 一時停止モードでは、
stream.read()
メソッドを明示的に呼び出して、ストリームからデータのチャンクを読み取る必要があります。
すべてのReadable
ストリームは一時停止モードで開始されますが、次のいずれかの方法でフローモードに切り替えることができます。
'data'
イベントハンドラーを追加する。stream.resume()
メソッドを呼び出す。stream.pipe()
メソッドを呼び出して、データをWritable
に送信する。
Readable
は、次のいずれかの方法を使用して一時停止モードに戻すことができます。
- パイプの宛先がない場合は、
stream.pause()
メソッドを呼び出す。 - パイプの宛先がある場合は、すべてのパイプの宛先を削除する。複数のパイプの宛先は、
stream.unpipe()
メソッドを呼び出すことで削除できます。
重要な点は、データの消費または無視のメカニズムが提供されるまで、Readable
はデータ生成を行わないということです。消費メカニズムが無効化または削除されると、Readable
はデータの生成を 停止しようとします。
下位互換性のため、'data'
イベントハンドラーを削除しても、ストリームは自動的に一時停止されません。また、パイプされた宛先がある場合、stream.pause()
を呼び出しても、それらの宛先がドレインしてさらにデータを求めた後にストリームが一時停止されたままになる保証はありません。
Readable
がフローモードに切り替えられ、データ処理可能なコンシューマーがない場合、そのデータは失われます。これは、たとえば、'data'
イベントにリスナーを接続せずにreadable.resume()
メソッドが呼び出された場合、または'data'
イベントハンドラーがストリームから削除された場合に発生する可能性があります。
'readable'
イベントハンドラーを追加すると、ストリームは自動的にフローを停止し、データはreadable.read()
を使用して消費する必要があります。'readable'
イベントハンドラーが削除されると、'data'
イベントハンドラーがある場合、ストリームは再びフローを開始します。
3 つの状態
Readable
ストリームの「2 つのモード」の動作は、Readable
ストリームの実装内で発生しているより複雑な内部状態管理に対する簡略化された抽象化です。
具体的には、任意の時点で、すべてのReadable
は次の 3 つの可能な状態のいずれかになります。
readable.readableFlowing === null
readable.readableFlowing === false
readable.readableFlowing === true
readable.readableFlowing
が null
の場合、ストリームのデータを使用するためのメカニズムは提供されません。したがって、ストリームはデータを作成しません。この状態では、'data'
イベントのリスナーをアタッチしたり、readable.pipe()
メソッドを呼び出したり、readable.resume()
メソッドを呼び出したりすると、readable.readableFlowing
が true
に切り替わり、Readable
はデータが生成されると積極的にイベントを発行し始めます。
readable.pause()
、readable.unpipe()
を呼び出すか、バックプレッシャーを受け取ると、readable.readableFlowing
は false
に設定され、イベントのフローは一時的に停止しますが、データの生成は停止しません。この状態では、'data'
イベントのリスナーをアタッチしても、readable.readableFlowing
は true
に切り替わりません。
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()
pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing is now false.
pass.on('data', chunk => {
console.log(chunk.toString())
})
// readableFlowing is still false.
pass.write('ok') // Will not emit 'data'.
pass.resume() // Must be called to make stream emit 'data'.
// readableFlowing is now true.
readable.readableFlowing
が false
の間、データはストリームの内部バッファに蓄積されている可能性があります。
API スタイルの選択
Readable
ストリーム API は、複数の Node.js バージョンで進化しており、ストリームデータを使用するための複数のメソッドを提供しています。一般的に、開発者はデータを使用するメソッドを1 つ選択する必要があり、単一のストリームからデータを使用するために複数のメソッドを使用するべきではありません。具体的には、on('data')
、on('readable')
、pipe()
、または非同期イテレータを組み合わせて使用すると、直感に反する動作につながる可能性があります。
クラス: stream.Readable
追加されたバージョン: v0.9.4
イベント: 'close'
[履歴]
バージョン | 変更 |
---|---|
v10.0.0 | emitClose オプションを追加して、'close' イベントが破棄時に emit されるかどうかを指定できるようにしました。 |
v0.9.4 | v0.9.4 で追加 |
'close'
イベントは、ストリームとその基礎となるリソース(ファイル記述子など)が閉じられたときに発生します。このイベントは、それ以上イベントが発生せず、それ以上の計算が行われないことを示します。
Readable
ストリームは、emitClose
オプションを指定して作成された場合、常に 'close'
イベントを発生させます。
イベント: 'data'
追加されたバージョン: v0.9.4
chunk
<Buffer> | <string> | <any> データのチャンク。オブジェクトモードで動作していないストリームの場合、チャンクは文字列またはBuffer
になります。オブジェクトモードのストリームの場合、チャンクはnull
以外の任意の JavaScript 値にすることができます。
'data'
イベントは、ストリームがデータのチャンクの所有権をコンシューマーに譲渡するたびに発生します。これは、readable.pipe()
、readable.resume()
を呼び出すことによって、または 'data'
イベントにリスナーコールバックをアタッチすることによって、ストリームがフローイングモードに切り替えられるたびに発生する可能性があります。'data'
イベントは、readable.read()
メソッドが呼び出され、返されるデータのチャンクが利用可能な場合にも発生します。
明示的に一時停止されていないストリームに 'data'
イベントリスナーをアタッチすると、ストリームはフローイングモードに切り替わります。データは利用可能になり次第渡されます。
リスナーコールバックには、readable.setEncoding()
メソッドを使用してストリームにデフォルトのエンコーディングが指定されている場合は文字列として、そうでない場合は Buffer
として、データのチャンクが渡されます。
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Received ${chunk.length} bytes of data.`)
})
イベント: 'end'
追加バージョン: v0.9.4
'end'
イベントは、ストリームから消費できるデータがなくなった時に発生します。
'end'
イベントは、データが完全に消費されない限り発生しません。これは、ストリームをフローイングモードに切り替えるか、stream.read()
をデータがすべて消費されるまで繰り返し呼び出すことで実現できます。
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Received ${chunk.length} bytes of data.`)
})
readable.on('end', () => {
console.log('There will be no more data.')
})
イベント: 'error'
追加バージョン: v0.9.4
'error'
イベントは、Readable
実装によっていつでも発生する可能性があります。通常、これは基盤となるストリームが内部エラーのためにデータ生成できない場合、またはストリーム実装が無効なデータチャンクをプッシュしようとした場合に発生する可能性があります。
リスナーコールバックには、単一の Error
オブジェクトが渡されます。
イベント: 'pause'
追加バージョン: v0.9.4
'pause'
イベントは、stream.pause()
が呼び出され、readableFlowing
が false
でない場合に発生します。
イベント: 'readable'
[履歴]
バージョン | 変更 |
---|---|
v10.0.0 | 'readable' は .push() が呼び出された後の次のティックで常に発生します。 |
v10.0.0 | 'readable' を使用するには .read() を呼び出す必要があります。 |
v0.9.4 | 追加バージョン: v0.9.4 |
'readable'
イベントは、ストリームから読み取ることができるデータが、設定されたハイウォーターマーク(state.highWaterMark
)まで利用可能になったときに発生します。事実上、ストリームがバッファ内に新しい情報を持っていることを示します。このバッファ内にデータがあれば、stream.read()
を呼び出してそのデータを取得できます。さらに、'readable'
イベントは、ストリームの終端に達した場合にも発生する可能性があります。
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
// There is some data to read now.
let data
while ((data = this.read()) !== null) {
console.log(data)
}
})
ストリームの終端に達した場合、stream.read()
を呼び出すと null
が返され、'end'
イベントが発生します。読み取るデータがまったくなかった場合も同様です。例えば、次の例では、foo.txt
は空ファイルです。
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
console.log('end')
})
このスクリプトを実行した出力は次のとおりです。
$ node test.js
readable: null
end
場合によっては、'readable'
イベントのリスナーをアタッチすると、内部バッファに一定量のデータが読み込まれます。
一般的に、readable.pipe()
と 'data'
イベントメカニズムは 'readable'
イベントよりも理解しやすいです。しかし、'readable'
を処理するとスループットが向上する可能性があります。
'readable'
と 'data'
の両方が同時に使用されている場合、'readable'
がフロー制御で優先されます。つまり、stream.read()
が呼び出された場合にのみ 'data'
が発生します。readableFlowing
プロパティは false
になります。'readable'
が削除されたときに 'data'
リスナーが存在する場合、ストリームはフローイングを開始します。つまり、.resume()
を呼び出さずに 'data'
イベントが発生します。
イベント: 'resume'
追加されたバージョン: v0.9.4
'resume'
イベントは、stream.resume()
が呼び出され、readableFlowing
が true
でない場合に発生します。
readable.destroy([error])
[履歴]
バージョン | 変更点 |
---|---|
v14.0.0 | すでに破棄されているストリームに対しては何もしない動作になります。 |
v8.0.0 | 追加されたバージョン: v8.0.0 |
ストリームを破棄します。オプションで 'error'
イベントと 'close'
イベントを発生させます(emitClose
が false
に設定されている場合を除きます)。この呼び出しの後、読み込みストリームは内部リソースを解放し、その後の push()
への呼び出しは無視されます。
destroy()
が呼び出された後は、それ以降の呼び出しは何もしない動作になり、_destroy()
からのエラーを除いて、'error'
としてさらにエラーが発生することはありません。
実装者はこのメソッドをオーバーライドするのではなく、readable._destroy()
を実装する必要があります。
readable.closed
追加されたバージョン: v18.0.0
'close'
が発生した後は true
になります。
readable.destroyed
追加されたバージョン: v8.0.0
readable.destroy()
が呼び出された後は true
になります。
readable.isPaused()
追加されたバージョン: v0.11.14
- 戻り値: <boolean>
readable.isPaused()
メソッドは、Readable
の現在の動作状態を返します。これは主に、readable.pipe()
メソッドの基礎となるメカニズムによって使用されます。ほとんどの典型的なケースでは、このメソッドを直接使用する理由はほとんどありません。
const readable = new stream.Readable()
readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()
追加: v0.9.4
- 戻り値: <this>
readable.pause()
メソッドは、フローモードにあるストリームにおいて'data'
イベントの発行を停止し、フローモードから切り替えます。利用可能になるデータは、内部バッファに残ります。
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Received ${chunk.length} bytes of data.`)
readable.pause()
console.log('There will be no additional data for 1 second.')
setTimeout(() => {
console.log('Now data will start flowing again.')
readable.resume()
}, 1000)
})
'readable'
イベントリスナーが存在する場合は、readable.pause()
メソッドは効果がありません。
readable.pipe(destination[, options])
追加: v0.9.4
destination
<stream.Writable> データ書き込み先の宛先options
<Object> パイプオプションend
<boolean> リーダーが終了したときにライターを終了します。デフォルト:true
。
戻り値: <stream.Writable>
Duplex
またはTransform
ストリームである場合、パイプの連鎖を可能にする宛先。
readable.pipe()
メソッドは、Writable
ストリームをreadable
に接続し、自動的にフローモードに切り替えて、そのすべてのデータを接続されたWritable
にプッシュします。データのフローは、宛先のWritable
ストリームがより高速なReadable
ストリームによって圧倒されないように、自動的に管理されます。
次の例では、readable
からのすべてのデータをfile.txt
という名前のファイルにパイプします。
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// readableからのすべてのデータは'file.txt'に入ります。
readable.pipe(writable)
1 つのReadable
ストリームに複数のWritable
ストリームを接続できます。
readable.pipe()
メソッドは宛先ストリームへの参照を返し、パイプされたストリームの連鎖を設定することを可能にします。
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)
デフォルトでは、ソースReadable
ストリームが'end'
を発行すると、宛先のWritable
ストリームでstream.end()
が呼び出され、宛先は書き込み不可になります。このデフォルトの動作を無効にするには、end
オプションをfalse
として渡すと、宛先ストリームは開いたままになります。
reader.pipe(writer, { end: false })
reader.on('end', () => {
writer.end('Goodbye\n')
})
重要な注意点として、処理中にReadable
ストリームがエラーを発行した場合、Writable
の宛先は自動的に閉じられません。エラーが発生した場合は、メモリリークを防ぐために、各ストリームを手動で閉じる必要があります。
指定されたオプションに関係なく、process.stderr
とprocess.stdout
Writable
ストリームは、Node.js プロセスが終了するまで決して閉じられません。
readable.read([size])
追加日:v0.9.4
readable.read()
メソッドは、内部バッファからデータを読み出して返します。読み取るデータがない場合、null
を返します。readable.setEncoding()
メソッドを使用してエンコードが指定されている場合、またはストリームがオブジェクトモードで動作している場合を除き、データはデフォルトで Buffer
オブジェクトとして返されます。
オプションの size
引数は、読み取るバイト数を指定します。size
バイトを読み取ることができない場合、ストリームが終了している場合を除き、null
が返されます。ストリームが終了している場合は、内部バッファに残っているすべてのデータが返されます。
size
引数を指定しない場合、内部バッファに含まれるすべてのデータが返されます。
size
引数は、1 GiB 以下である必要があります。
readable.read()
メソッドは、一時停止モードで動作している Readable
ストリームに対してのみ呼び出す必要があります。フローイングモードでは、内部バッファが完全に空になるまで readable.read()
が自動的に呼び出されます。
const readable = getReadableStreamSomehow()
// データがバッファにバッファリングされると、'readable' が複数回トリガーされる可能性があります
readable.on('readable', () => {
let chunk
console.log('ストリームは読み取り可能です(バッファに新しいデータを受信)')
// すべて利用可能なデータを読み取るためにループを使用します
while (null !== (chunk = readable.read())) {
console.log(`データの ${chunk.length} バイトを読み取りました...`)
}
})
// 利用可能なデータがなくなると、'end' が1回トリガーされます
readable.on('end', () => {
console.log('ストリームの終わりに到達しました。')
})
readable.read()
を呼び出すたびに、データのチャンクまたは null
が返され、その時点で読み取るデータがなくなったことを示します。これらのチャンクは自動的に連結されません。1 回の read()
呼び出しですべてのデータが返されるわけではないため、すべてのデータを取得するまでチャンクを継続的に読み取るには、while ループが必要になる場合があります。大きなファイルを読み取るとき、.read()
は一時的に null
を返すことがあり、バッファされたすべてのコンテンツが消費されたが、さらにバッファされるデータがある可能性があることを示します。このような場合、バッファにもうデータがあるとその時点で新しい 'readable'
イベントが発行され、'end'
イベントはデータ送信の終了を示します。
したがって、readable
からファイルの全内容を読み取るには、複数の 'readable'
イベントにわたってチャンクを収集する必要があります。
const chunks = []
readable.on('readable', () => {
let chunk
while (null !== (chunk = readable.read())) {
chunks.push(chunk)
}
})
readable.on('end', () => {
const content = chunks.join('')
})
オブジェクトモードの Readable
ストリームは、size
引数の値に関係なく、readable.read(size)
を呼び出すと常に 1 つのアイテムを返します。
readable.read()
メソッドがデータのチャンクを返す場合、'data'
イベントも発行されます。
'end'
イベントが発行された後に stream.read([size])
を呼び出すと、null
が返されます。ランタイムエラーは発生しません。
readable.readable
追加:v11.4.0
readable.read()
の呼び出しが安全かどうかを示す true
です。これは、ストリームが破棄されていないか、'error'
または 'end'
を emit していないことを意味します。
readable.readableAborted
追加:v16.8.0
'end'
を emit する前にストリームが破棄されたか、エラーが発生したかどうかを返します。
readable.readableDidRead
追加:v16.7.0, v14.18.0
'data'
が emit されたかどうかを返します。
readable.readableEncoding
追加:v12.7.0
指定された Readable
ストリームの encoding
プロパティのゲッターです。encoding
プロパティは、readable.setEncoding()
メソッドを使用して設定できます。
readable.readableEnded
追加:v12.9.0
'end'
イベントが emit されると true
になります。
readable.errored
追加:v18.0.0
ストリームがエラーで破棄された場合にエラーを返します。
readable.readableFlowing
追加:v9.4.0
このプロパティは、3 つの状態 セクションで説明されているように、Readable
ストリームの現在の状態を反映します。
readable.readableHighWaterMark
追加日時: v9.3.0
この Readable
の作成時に渡された highWaterMark
の値を返します。
readable.readableLength
追加日時: v9.4.0
このプロパティは、読み取り準備ができたキュー内のバイト数(またはオブジェクト数)を含みます。この値は、highWaterMark
の状態に関するイントロスペクションデータを提供します。
readable.readableObjectMode
追加日時: v12.3.0
指定された Readable
ストリームの objectMode
プロパティのゲッターです。
readable.resume()
[履歴]
バージョン | 変更点 |
---|---|
v10.0.0 | 'readable' イベントリスナーが存在する場合は、resume() は効果がありません。 |
v0.9.4 | 追加日時: v0.9.4 |
- 戻り値: <this>
readable.resume()
メソッドは、明示的に一時停止された Readable
ストリームを再開して 'data'
イベントの送出を再開し、ストリームをフローイングモードに切り替えます。
readable.resume()
メソッドは、データ処理を行わずにストリームからデータを完全に消費するために使用できます。
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Reached the end, but did not read anything.')
})
'readable'
イベントリスナーが存在する場合は、readable.resume()
メソッドは効果がありません。
readable.setEncoding(encoding)
追加日時: v0.9.4
readable.setEncoding()
メソッドは、Readable
ストリームから読み取られるデータの文字エンコーディングを設定します。
デフォルトでは、エンコーディングは割り当てられず、ストリームデータは Buffer
オブジェクトとして返されます。エンコーディングを設定すると、ストリームデータは Buffer
オブジェクトではなく、指定されたエンコーディングの文字列として返されるようになります。たとえば、readable.setEncoding('utf8')
を呼び出すと、出力データは UTF-8 データとして解釈され、文字列として渡されます。readable.setEncoding('hex')
を呼び出すと、データは 16 進数の文字列形式でエンコードされます。
Readable
ストリームは、ストリームから Buffer
オブジェクトとして単純に取得した場合に正しくデコードされなくなる可能性のある、ストリームを介して配信されるマルチバイト文字を適切に処理します。
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
assert.equal(typeof chunk, 'string')
console.log('Got %d characters of string data:', chunk.length)
})
readable.unpipe([destination])
追加日時: v0.9.4
destination
<stream.Writable> オプション。アンパイプする特定のストリーム- 戻り値: <this>
readable.unpipe()
メソッドは、stream.pipe()
メソッドを使用して以前に接続された Writable
ストリームを切り離します。
destination
が指定されていない場合、すべてのパイプが切り離されます。
destination
が指定されているが、それにパイプが設定されていない場合、メソッドは何もしません。
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// readableからのすべてのデータは'file.txt'に入りますが、最初の1秒間のみです。
readable.pipe(writable)
setTimeout(() => {
console.log('Stop writing to file.txt.')
readable.unpipe(writable)
console.log('Manually close the file stream.')
writable.end()
}, 1000)
readable.unshift(chunk[, encoding])
[履歴]
バージョン | 変更点 |
---|---|
v22.0.0, v20.13.0 | chunk 引数に TypedArray または DataView インスタンスを使用できるようになりました。 |
v8.0.0 | chunk 引数に Uint8Array インスタンスを使用できるようになりました。 |
v0.9.11 | 追加日時: v0.9.11 |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> 読み込みキューの先頭に挿入するデータのチャンク。オブジェクトモードではないストリームの場合、chunk
は <string>、<Buffer>、<TypedArray>、<DataView> またはnull
でなければなりません。オブジェクトモードストリームの場合、chunk
は任意の JavaScript 値にすることができます。encoding
<string> 文字列チャンクのエンコーディング。'utf8'
や'ascii'
などの有効なBuffer
エンコーディングでなければなりません。
chunk
を null
として渡すと、ストリームの終わり (EOF) を示し、readable.push(null)
と同じ動作になり、それ以降はデータを書込むことができなくなります。EOF シグナルはバッファの最後に置かれ、バッファリングされたデータはすべてフラッシュされます。
readable.unshift()
メソッドは、データのチャンクを内部バッファにプッシュします。これは、ソースから楽観的に一部のデータを取り出したコードによってストリームが消費されている特定の状況で役立ちます。これにより、データを他の当事者に渡すことができます。
stream.unshift(chunk)
メソッドは、'end'
イベントが発行された後では呼び出すことができず、ランタイムエラーが発生します。
stream.unshift()
を使用している開発者は、代わりに Transform
ストリームの使用に切り替えることを検討する必要があります。「ストリーム実装者向けの API」セクションを参照してください。
// \n\nで区切られたヘッダーを取り出します。
// 多すぎる場合はunshift()を使用します。
// (error, header, stream)でコールバックを呼び出します。
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
stream.on('error', callback)
stream.on('readable', onReadable)
const decoder = new StringDecoder('utf8')
let header = ''
function onReadable() {
let chunk
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk)
if (str.includes('\n\n')) {
// ヘッダー境界が見つかりました。
const split = str.split(/\n\n/)
header += split.shift()
const remaining = split.join('\n\n')
const buf = Buffer.from(remaining, 'utf8')
stream.removeListener('error', callback)
// unshiftする前に'readable'リスナーを削除します。
stream.removeListener('readable', onReadable)
if (buf.length) stream.unshift(buf)
// メッセージの本文はストリームから読み取ることができます。
callback(null, header, stream)
return
}
// ヘッダーを読み取り中です。
header += str
}
}
}
stream.push(chunk)
とは異なり、stream.unshift(chunk)
はストリームの内部読み込み状態をリセットすることによって読み込みプロセスを終了しません。これは、読み込み中に(つまり、カスタムストリームの stream._read()
実装内から)readable.unshift()
が呼び出された場合に予期しない結果を引き起こす可能性があります。readable.unshift()
の呼び出し後にすぐに stream.push('')
を実行すると、読み込み状態が適切にリセットされますが、読み込みの実行中に readable.unshift()
を呼び出さない方が最善です。
readable.wrap(stream)
追加されたバージョン: v0.9.4
Node.js 0.10 より前では、ストリームは現在のnode:stream
モジュール API 全体を実装していませんでした。(詳細は互換性を参照してください。)
'data'
イベントを発行し、アドバイザリのみのstream.pause()
メソッドを持つ古い Node.js ライブラリを使用する場合、readable.wrap()
メソッドを使用して、古いストリームをデータソースとして使用するReadable
ストリームを作成できます。
readable.wrap()
を使用する必要はほとんどありませんが、古い Node.js アプリケーションやライブラリとの対話のための便宜として提供されています。
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)
myReader.on('readable', () => {
myReader.read() // etc.
})
readable[Symbol.asyncIterator]()
[履歴]
バージョン | 変更 |
---|---|
v11.14.0 | Symbol.asyncIterator のサポートは実験的ではなくなりました。 |
v10.0.0 | 追加されたバージョン: v10.0.0 |
- 戻り値: ストリームを完全に消費するための <AsyncIterator>。
const fs = require('node:fs')
async function print(readable) {
readable.setEncoding('utf8')
let data = ''
for await (const chunk of readable) {
data += chunk
}
console.log(data)
}
print(fs.createReadStream('file')).catch(console.error)
ループがbreak
、return
、またはthrow
で終了すると、ストリームは破棄されます。言い換えれば、ストリームを反復処理すると、ストリームは完全に消費されます。ストリームはhighWaterMark
オプションに等しいサイズのチャンクで読み取られます。上記のコード例では、highWaterMark
オプションがfs.createReadStream()
に提供されていないため、ファイルのデータが 64 KiB 未満の場合は、データは単一のチャンクになります。
readable[Symbol.asyncDispose]()
追加日時: v20.4.0, v18.18.0
AbortError
を使用して readable.destroy()
を呼び出し、ストリームが終了したときに解決する Promise を返します。
readable.compose(stream[, options])
追加日時: v19.1.0, v18.13.0
stream
<Stream> | <Iterable> | <AsyncIterable> | <Function>options
<Object>signal
<AbortSignal> シグナルが中断された場合にストリームの破棄を許可します。
戻り値: <Duplex> ストリーム
stream
と合成されたストリーム。
import { Readable } from 'node:stream'
async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ')
for (const word of words) {
yield word
}
}
}
const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords)
const words = await wordsStream.toArray()
console.log(words) // ['this', 'is', 'compose', 'as', 'operator'] を出力します
詳細は stream.compose
を参照してください。
readable.iterator([options])
追加日時: v16.3.0
options
<Object>destroyOnReturn
<boolean>false
に設定されている場合、非同期イテレーターでreturn
を呼び出したり、break
、return
、throw
を使用してfor await...of
反復処理を終了しても、ストリームは破棄されません。デフォルト:true
。
戻り値: ストリームを使用するための <AsyncIterator>。
このメソッドによって作成されたイテレーターは、for await...of
ループが return
、break
、または throw
によって終了した場合、またはストリームが反復中にエラーを発生させた場合に、ストリームの破棄をキャンセルするオプションをユーザーに提供します。
const { Readable } = require('node:stream')
async function printIterator(readable) {
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk) // 1
break
}
console.log(readable.destroyed) // false
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk) // 2 と 3 を出力します
}
console.log(readable.destroyed) // true、ストリームは完全に消費されました
}
async function printSymbolAsyncIterator(readable) {
for await (const chunk of readable) {
console.log(chunk) // 1
break
}
console.log(readable.destroyed) // true
}
async function showBoth() {
await printIterator(Readable.from([1, 2, 3]))
await printSymbolAsyncIterator(Readable.from([1, 2, 3]))
}
showBoth()
readable.map(fn[, options])
[履歴]
バージョン | 変更 |
---|---|
v20.7.0, v18.19.0 | options にhighWaterMark を追加 |
v17.4.0, v16.14.0 | 追加: v17.4.0, v16.14.0 |
fn
<Function> | <AsyncFunction> ストリーム内の各チャンクに対してマップする関数。data
<any> ストリームからのデータチャンク。options
<Object>signal
<AbortSignal> ストリームが破棄された場合に中断され、fn
呼び出しを早期に中断できます。
options
<Object>concurrency
<number> ストリームに対して一度に呼び出すfn
の最大同時呼び出し数。デフォルト:1
。highWaterMark
<number> マップされたアイテムのユーザー消費を待っている間にバッファするアイテム数。デフォルト:concurrency * 2 - 1
。signal
<AbortSignal> シグナルが中断された場合にストリームを破棄できます。
戻り値: <Readable> 関数
fn
でマップされたストリーム。
このメソッドを使用すると、ストリームをマップできます。fn
関数は、ストリーム内の各チャンクに対して呼び出されます。fn
関数が Promise を返す場合、その Promise はawait
された後に結果ストリームに渡されます。
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// 同期マッパーを使用
for await (const chunk of Readable.from([1, 2, 3, 4]).map(x => x * 2)) {
console.log(chunk) // 2, 4, 6, 8
}
// 非同期マッパーを使用し、最大2つのクエリを同時に実行
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
domain => resolver.resolve4(domain),
{ concurrency: 2 }
)
for await (const result of dnsResults) {
console.log(result) // resolver.resolve4のDNS結果をログ出力します。
}
readable.filter(fn[, options])
[履歴]
バージョン | 変更点 |
---|---|
v20.7.0, v18.19.0 | options に highWaterMark を追加 |
v17.4.0, v16.14.0 | 追加: v17.4.0, v16.14.0 |
fn
<Function> | <AsyncFunction> ストリームからチャンクをフィルタリングする関数。data
<any> ストリームからのデータチャンク。options
<Object>signal
<AbortSignal> ストリームが破棄された場合に中止され、fn
の呼び出しを早期に中止できます。
options
<Object>concurrency
<number> ストリームで同時に呼び出されるfn
の最大同時実行数。デフォルト:1
。highWaterMark
<number> フィルタリングされたアイテムのユーザーによる消費を待っている間にバッファするアイテムの数。デフォルト:concurrency * 2 - 1
。signal
<AbortSignal> シグナルが中止された場合にストリームの破棄を許可します。
戻り値: <Readable> 述語
fn
でフィルタリングされたストリーム。
このメソッドを使用すると、ストリームをフィルタリングできます。ストリーム内の各チャンクに対して fn
関数が呼び出され、真の値を返す場合、そのチャンクは結果ストリームに渡されます。fn
関数が Promise を返す場合、その Promise は await
されます。
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// 同期述語を使用する場合。
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
console.log(chunk) // 3, 4
}
// 非同期述語を使用し、最大 2 つのクエリを一度に行う場合。
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).filter(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address.ttl > 60
},
{ concurrency: 2 }
)
for await (const result of dnsResults) {
// 解決された DNS レコードで 60 秒を超えるドメインをログ出力します。
console.log(result)
}
readable.forEach(fn[, options])
追加: v17.5.0, v16.15.0
fn
<Function> | <AsyncFunction> ストリームの各チャンクで呼び出す関数。data
<any> ストリームからのデータチャンク。options
<Object>signal
<AbortSignal> ストリームが破棄された場合に中断され、fn
の呼び出しを早期に中断できます。
options
<Object>concurrency
<number> ストリームで一度に呼び出すfn
の最大同時呼び出し数。デフォルト:1
。signal
<AbortSignal> シグナルが中断された場合にストリームを破棄できます。
戻り値: <Promise> ストリームが終了した時点の Promise。
このメソッドは、ストリームの反復処理を可能にします。ストリーム内の各チャンクに対して、fn
関数が呼び出されます。fn
関数が Promise を返す場合、その Promise はawait
されます。
このメソッドはfor await...of
ループとは異なり、オプションでチャンクを同時に処理できます。さらに、forEach
反復はsignal
オプションを渡して関連するAbortController
を中断することでのみ停止できますが、for await...of
はbreak
またはreturn
で停止できます。いずれの場合も、ストリームは破棄されます。
このメソッドは'data'
イベントをリスニングするとは異なり、基盤となる仕組みでreadable
イベントを使用し、同時fn
呼び出しの数を制限できます。
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// 同期プリディケートを使用。
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
console.log(chunk) // 3, 4
}
// 非同期プリディケートを使用し、最大2つのクエリを同時に実行します。
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address
},
{ concurrency: 2 }
)
await dnsResults.forEach(result => {
// `for await (const result of dnsResults)` と同様に結果を出力します。
console.log(result)
})
console.log('done') // ストリームが終了しました
readable.toArray([options])
追加: v17.5.0, v16.15.0
options
<Object>signal
<AbortSignal> シグナルが中断された場合、toArray 操作をキャンセルできます。
戻り値: <Promise> ストリームの内容を含む配列を含む Promise。
このメソッドを使用すると、ストリームの内容を簡単に取得できます。
このメソッドはストリーム全体をメモリに読み込むため、ストリームの利点を無効にします。ストリームを使用する主要な方法としてではなく、相互運用性と利便性を目的としています。
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
await Readable.from([1, 2, 3, 4]).toArray() // [1, 2, 3, 4]
// .mapを使用してDNSクエリを同時に実行し、toArrayを使用して結果を配列に収集します
const dnsResults = await Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org'])
.map(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address
},
{ concurrency: 2 }
)
.toArray()
readable.some(fn[, options])
追加: v17.5.0, v16.15.0
fn
<Function> | <AsyncFunction> ストリームの各チャンクに対して呼び出す関数。data
<any> ストリームからのデータチャンク。options
<Object>signal
<AbortSignal> ストリームが破棄された場合に中断され、fn
呼び出しを早期に中断できます。
options
<Object>concurrency
<number> ストリームで一度に呼び出すfn
の最大同時呼び出し数。デフォルト:1
。signal
<AbortSignal> シグナルが中断された場合、ストリームを破棄できます。
戻り値: <Promise>
fn
がチャンクの少なくとも 1 つに対して真の値を返した場合にtrue
を評価する Promise。
このメソッドはArray.prototype.some
に似ており、待機された戻り値がtrue
(または真の値)になるまで、ストリームの各チャンクでfn
を呼び出します。チャンクに対するfn
呼び出しの待機された戻り値が真の値になると、ストリームは破棄され、Promise はtrue
で満たされます。チャンクに対するfn
呼び出しのいずれも真の値を返さない場合、Promise はfalse
で満たされます。
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// 同期述語を使用します。
await Readable.from([1, 2, 3, 4]).some(x => x > 2) // true
await Readable.from([1, 2, 3, 4]).some(x => x < 0) // false
// 非同期述語を使用し、最大2つのファイルチェックを一度に実行します。
const anyBigFile = await Readable.from(['file1', 'file2', 'file3']).some(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
console.log(anyBigFile) // リスト内のファイルのいずれかが1MBより大きい場合`true`
console.log('done') // ストリームが終了しました
readable.find(fn[, options])
追加: v17.5.0, v16.17.0
fn
<Function> | <AsyncFunction> ストリームの各チャンクに対して呼び出す関数。data
<any> ストリームからのデータチャンク。options
<Object>signal
<AbortSignal> ストリームが破棄された場合に中断され、fn
の呼び出しを早期に中止できます。
options
<Object>concurrency
<number> ストリームで一度に呼び出すfn
の最大同時呼び出し数。デフォルト:1
。signal
<AbortSignal> シグナルが中断された場合にストリームの破棄を許可します。
戻り値: <Promise>
fn
が真の値で評価された最初のチャンクを評価する Promise、または要素が見つからない場合はundefined
。
このメソッドは Array.prototype.find
と似ており、ストリーム内の各チャンクに対して fn
を呼び出し、fn
の真の値を持つチャンクを見つけます。fn
呼び出しの待機された戻り値が真の値になると、ストリームは破棄され、fn
が真の値を返した値を持つ Promise が解決されます。チャンクに対するすべての fn
呼び出しが偽の値を返す場合、Promise は undefined
で解決されます。
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// 同期述語を使用した場合。
await Readable.from([1, 2, 3, 4]).find(x => x > 2) // 3
await Readable.from([1, 2, 3, 4]).find(x => x > 0) // 1
await Readable.from([1, 2, 3, 4]).find(x => x > 10) // undefined
// 非同期述語を使用し、最大2つのファイルチェックを同時に行う場合。
const foundBigFile = await Readable.from(['file1', 'file2', 'file3']).find(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
console.log(foundBigFile) // 大きなファイルのファイル名(リスト内のファイルが1MBより大きい場合)
console.log('done') // ストリームが終了しました
readable.every(fn[, options])
追加: v17.5.0, v16.15.0
fn
<Function> | <AsyncFunction> ストリームの各チャンクに対して呼び出す関数。data
<any> ストリームからのデータチャンク。options
<Object>signal
<AbortSignal> ストリームが破棄された場合に中止され、fn
の呼び出しを早期に中止できます。
options
<Object>concurrency
<number> ストリームで一度に呼び出すfn
の最大同時呼び出し数。デフォルト:1
。signal
<AbortSignal> シグナルが中断された場合、ストリームを破棄できます。
戻り値: <Promise>
fn
がすべてのチャンクに対して真の値を返した場合にtrue
を評価する Promise。
このメソッドはArray.prototype.every
に似ており、ストリームの各チャンクに対してfn
を呼び出して、fn
のすべての待機された戻り値が真の値であるかどうかを確認します。チャンクに対するfn
呼び出しの待機された戻り値が偽の場合、ストリームは破棄され、Promise はfalse
で解決されます。チャンクに対するすべてのfn
呼び出しが真の値を返す場合、Promise はtrue
で解決されます。
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// 同期述語を使用した場合。
await Readable.from([1, 2, 3, 4]).every(x => x > 2) // false
await Readable.from([1, 2, 3, 4]).every(x => x > 0) // true
// 非同期述語を使用し、最大2つのファイルチェックを同時に実行した場合。
const allBigFiles = await Readable.from(['file1', 'file2', 'file3']).every(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
// リスト内のすべてのファイルが1MiBより大きい場合`true`
console.log(allBigFiles)
console.log('done') // ストリームが終了しました
readable.flatMap(fn[, options])
追加日時: v17.5.0, v16.15.0
fn
<Function> | <AsyncGeneratorFunction> | <AsyncFunction> ストリーム内の各チャンクに対してマップする関数。data
<any> ストリームからのデータチャンク。options
<Object>signal
<AbortSignal> ストリームが破棄された場合に中止され、fn
の呼び出しを早期に中止できます。
options
<Object>concurrency
<number> ストリームに対して一度に呼び出すfn
の最大同時実行数。デフォルト:1
。signal
<AbortSignal> シグナルが中止された場合にストリームを破棄できます。
戻り値: <Readable> 関数
fn
でフラットマップされたストリーム。
このメソッドは、指定されたコールバックをストリームの各チャンクに適用し、その結果をフラット化することで、新しいストリームを返します。
fn
からストリームまたは別の反復可能オブジェクト(または非同期反復可能オブジェクト)を返すことが可能で、結果のストリームは返されたストリームにマージ(フラット化)されます。
import { Readable } from 'node:stream'
import { createReadStream } from 'node:fs'
// 同期マッパーを使用。
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap(x => [x, x])) {
console.log(chunk) // 1, 1, 2, 2, 3, 3, 4, 4
}
// 非同期マッパーを使用。4つのファイルの内容を結合する
const concatResult = Readable.from(['./1.mjs', './2.mjs', './3.mjs', './4.mjs']).flatMap(fileName =>
createReadStream(fileName)
)
for await (const result of concatResult) {
// これは4つのファイルすべての内容(すべてのチャンク)を含みます
console.log(result)
}
readable.drop(limit[, options])
追加日時: v17.5.0, v16.15.0
limit
<number> readable から削除するチャンク数。options
<Object>signal
<AbortSignal> シグナルが中断された場合、ストリームの破棄を許可します。
戻り値: <Readable>
limit
個のチャンクが削除されたストリーム。
このメソッドは、最初のlimit
個のチャンクが削除された新しいストリームを返します。
import { Readable } from 'node:stream'
await Readable.from([1, 2, 3, 4]).drop(2).toArray() // [3, 4]
readable.take(limit[, options])
追加日時: v17.5.0, v16.15.0
limit
<number> readable から取得するチャンク数。options
<Object>signal
<AbortSignal> シグナルが中断された場合、ストリームの破棄を許可します。
戻り値: <Readable>
limit
個のチャンクが取得されたストリーム。
このメソッドは、最初のlimit
個のチャンクを含む新しいストリームを返します。
import { Readable } from 'node:stream'
await Readable.from([1, 2, 3, 4]).take(2).toArray() // [1, 2]
readable.reduce(fn[, initial[, options]])
追加日時: v17.5.0, v16.15.0
fn
<Function> | <AsyncFunction> ストリーム内の各チャンクに対して呼び出す縮約関数。previous
<any> 直前のfn
呼び出しで得られた値、またはinitial
値(指定されている場合)、そうでない場合はストリームの最初のチャンク。data
<any> ストリームからのデータチャンク。options
<Object>signal
<AbortSignal> ストリームが破棄された場合に中断され、fn
呼び出しを早期に中断できます。
initial
<any> 縮約で使用する初期値。options
<Object>signal
<AbortSignal> シグナルが中断された場合、ストリームの破棄を許可します。
戻り値: <Promise> 縮約の最終値に対する Promise。
このメソッドは、ストリームの各チャンクに対して順にfn
を呼び出し、前の要素の計算結果を渡します。縮約の最終値に対する Promise を返します。
initial
値が指定されていない場合、ストリームの最初のチャンクが初期値として使用されます。ストリームが空の場合、ERR_INVALID_ARGS
コードプロパティを持つTypeError
で Promise が拒否されます。
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'
const directoryPath = './src'
const filesInDir = await readdir(directoryPath)
const folderSize = await Readable.from(filesInDir).reduce(async (totalSize, file) => {
const { size } = await stat(join(directoryPath, file))
return totalSize + size
}, 0)
console.log(folderSize)
縮約関数はストリーム要素を 1 つずつ反復処理するため、concurrency
パラメータや並列処理はありません。並列的にreduce
を実行するには、非同期関数をreadable.map
メソッドに抽出できます。
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'
const directoryPath = './src'
const filesInDir = await readdir(directoryPath)
const folderSize = await Readable.from(filesInDir)
.map(file => stat(join(directoryPath, file)), { concurrency: 2 })
.reduce((totalSize, { size }) => totalSize + size, 0)
console.log(folderSize)
デュプレックスストリームとトランスフォームストリーム
クラス: stream.Duplex
[履歴]
バージョン | 変更内容 |
---|---|
v6.8.0 | Duplex のインスタンスは、instanceof stream.Writable をチェックした際にtrue を返すようになりました。 |
v0.9.4 | 追加: v0.9.4 |
デュプレックスストリームは、Readable
とWritable
の両方のインターフェースを実装するストリームです。
Duplex
ストリームの例:
duplex.allowHalfOpen
追加: v0.9.4
false
の場合、ストリームは読み込み側が終了したときに書き込み側を自動的に終了します。allowHalfOpen
コンストラクタオプションで初期設定され、デフォルトはtrue
です。
これは既存のDuplex
ストリームインスタンスのハーフオープン動作を変更するために手動で変更できますが、'end'
イベントが emit される前に変更する必要があります。
クラス: stream.Transform
追加: v0.9.4
トランスフォームストリームは、出力が何らかの方法で入力に関連付けられたDuplex
ストリームです。すべてのDuplex
ストリームと同様に、Transform
ストリームはReadable
とWritable
の両方のインターフェースを実装します。
Transform
ストリームの例:
transform.destroy([error])
[履歴]
バージョン | 変更内容 |
---|---|
v14.0.0 | すでに破棄されているストリームでは、no-op として機能します。 |
v8.0.0 | 追加: v8.0.0 |
ストリームを破棄し、オプションで'error'
イベントを emit します。この呼び出しの後、トランスフォームストリームは内部リソースを解放します。実装者はこのメソッドをオーバーライドするのではなく、readable._destroy()
を実装する必要があります。Transform
の_destroy()
のデフォルトの実装は、emitClose
が false に設定されていない限り'close'
も emit します。
destroy()
が呼び出された後、それ以降の呼び出しは no-op になり、_destroy()
からのエラーを除いて、'error'
としてさらにエラーは emit されません。
stream.duplexPair([options])
追加日時: v22.6.0, v20.17.0
ユーティリティ関数duplexPair
は、2 つのアイテムを持つ配列を返します。各アイテムは、もう一方と接続されたDuplex
ストリームです。
const [sideA, sideB] = duplexPair()
一方のストリームに書き込まれたものは、もう一方のストリームで読み取り可能になります。クライアントによって書き込まれたデータがサーバーによって読み取られ、その逆も同様になるネットワーク接続と同様の動作を提供します。
Duplex ストリームは対称的です。一方または他方を動作の違いなく使用できます。
stream.finished(stream[, options], callback)
[履歴]
バージョン | 変更 |
---|---|
v19.5.0 | ReadableStream とWritableStream のサポートを追加 |
v15.11.0 | signal オプションを追加 |
v14.0.0 | finished(stream, cb) は、コールバックを呼び出す前に'close' イベントを待ちます。実装ではレガシストリームを検出し、'close' を発行すると予想されるストリームにのみこの動作を適用しようとします。 |
v14.0.0 | Readable ストリームで'end' の前に'close' を発行すると、ERR_STREAM_PREMATURE_CLOSE エラーが発生します。 |
v14.0.0 | finished(stream, cb) への呼び出しの前に既に終了しているストリームでコールバックが呼び出されます。 |
v10.0.0 | 追加日時: v10.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> 読み取り可能および/または書き込み可能なストリーム/ウェブストリーム。options
<Object>error
<boolean>false
に設定されている場合、emit('error', err)
への呼び出しは完了として扱われません。デフォルト:true
。readable
<boolean>false
に設定されている場合、ストリームがまだ読み取り可能であっても、ストリームが終了したときにコールバックが呼び出されます。デフォルト:true
。writable
<boolean>false
に設定されている場合、ストリームがまだ書き込み可能であっても、ストリームが終了したときにコールバックが呼び出されます。デフォルト:true
。signal
<AbortSignal> ストリームの終了待ちを中止できます。シグナルが中止された場合、基礎となるストリームは中止されません。コールバックはAbortError
で呼び出されます。この関数によって追加されたすべての登録済みリスナーも削除されます。
callback
<Function> オプションのエラー引数をとるコールバック関数。戻り値: <Function> 登録されているすべてのリスナーを削除するクリーンアップ関数。
ストリームが読み取り可能ではなくなり、書き込み可能ではなくなった、またはエラーが発生した、あるいは早期クローズイベントが発生したときに通知を受けるための関数です。
const { finished } = require('node:stream')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
finished(rs, err => {
if (err) {
console.error('Stream failed.', err)
} else {
console.log('Stream is done reading.')
}
})
rs.resume() // ストリームをドレインします。
ストリームが途中で破棄された(中断された HTTP リクエストなど)場合、'end'
または'finish'
を発行せず、エラー処理シナリオで特に役立ちます。
finished
API はPromise 版を提供します。
stream.finished()
は、callback
が呼び出された後も、ぶら下がっているイベントリスナー(特に'error'
、'end'
、'finish'
、'close'
)を残します。これは、(間違ったストリーム実装による)予期しない'error'
イベントによって予期しないクラッシュが発生しないようにするためです。これが望ましくない動作である場合、返されたクリーンアップ関数をコールバックで呼び出す必要があります。
const cleanup = finished(rs, err => {
cleanup()
// ...
})
stream.pipeline(source[, ...transforms], destination, callback)
stream.pipeline(streams, callback)
[履歴]
バージョン | 変更 |
---|---|
v19.7.0, v18.16.0 | Web Streams のサポートを追加 |
v18.0.0 | 無効なコールバックを callback 引数に渡すと、ERR_INVALID_CALLBACK ではなく ERR_INVALID_ARG_TYPE がスローされるようになりました。 |
v14.0.0 | pipeline(..., cb) は、コールバックを呼び出す前に 'close' イベントを待ちます。実装はレガシーストリームを検出し、'close' を emit することが予想されるストリームにのみこの動作を適用しようとします。 |
v13.10.0 | 非同期ジェネレータのサポートを追加 |
v10.0.0 | 追加: v10.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function> | <ReadableStream>- 戻り値: <Iterable> | <AsyncIterable>
...transforms
<Stream> | <Function> | <TransformStream>source
<AsyncIterable>- 戻り値: <AsyncIterable>
destination
<Stream> | <Function> | <WritableStream>source
<AsyncIterable>- 戻り値: <AsyncIterable> | <Promise>
callback
<Function> パイプラインが完全に完了したときに呼び出されます。err
<Error>val
destination
によって返されるPromise
の解決済み値。
戻り値: <Stream>
ストリームとジェネレータ間のパイプ処理を行うモジュールメソッドで、エラーを転送し、適切にクリーンアップを行い、パイプラインが完了したときにコールバックを提供します。
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// pipeline API を使用して、一連のストリームを簡単にパイプし、
// パイプラインが完全に完了したときに通知を受け取ります。
// 潜在的に巨大な tar ファイルを効率的に gzip するパイプライン:
pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
if (err) {
console.error('Pipeline failed.', err)
} else {
console.log('Pipeline succeeded.')
}
})
pipeline
API は、Promise バージョン を提供します。
stream.pipeline()
は、次のストリームを除くすべてのストリームで stream.destroy(err)
を呼び出します。
'end'
または'close'
を emit したReadable
ストリーム。'finish'
または'close'
を emit したWritable
ストリーム。
stream.pipeline()
は、callback
が呼び出された後も、ストリームにぶら下がるイベントリスナーを残します。失敗後にストリームを再利用する場合、これによりイベントリスナーのリークとエラーの飲み込みが発生する可能性があります。最後のストリームが読み取り可能な場合、最後のストリームを後で消費できるように、ぶら下がっているイベントリスナーは削除されます。
stream.pipeline()
は、エラーが発生するとすべてのストリームを閉じます。pipeline
を使用した IncomingRequest
は、予期しない応答を送信せずにソケットを破棄するため、予期しない動作につながる可能性があります。以下の例を参照してください。
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt')
pipeline(fileStream, res, err => {
if (err) {
console.log(err) // ファイルが存在しません
// `pipeline` が既にソケットを破棄しているので、このメッセージを送信できません
return res.end('error!!!')
}
})
})
stream.compose(...streams)
[履歴]
バージョン | 変更 |
---|---|
v21.1.0, v20.10.0 | stream クラスのサポートを追加 |
v19.8.0, v18.16.0 | Web Streams のサポートを追加 |
v16.9.0 | 追加: v16.9.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]> | <Duplex[]> | <Function>- 戻り値: <stream.Duplex>
最初のストリームに書き込み、最後のストリームから読み取るDuplex
ストリームに 2 つ以上のストリームを結合します。stream.pipeline
を使用して、提供された各ストリームは次のストリームにパイプされます。ストリームのいずれかがエラーになった場合、外部のDuplex
ストリームを含むすべてのストリームが破棄されます。
stream.compose
は、順番に他のストリームにパイプできる(そしてするべき)新しいストリームを返すため、合成を可能にします。対照的に、stream.pipeline
にストリームを渡す場合、通常、最初のストリームは読み取り可能なストリームで、最後のストリームは書き込み可能なストリームであり、閉ループを形成します。
Function
が渡された場合、それはsource
Iterable
を取るファクトリメソッドでなければなりません。
import { compose, Transform } from 'node:stream'
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''))
},
})
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
}
let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf
}
console.log(res) // 'HELLOWORLD' と出力
stream.compose
は、非同期イテラブル、ジェネレータ、関数をストリームに変換するために使用できます。
AsyncIterable
は読み取り可能なDuplex
に変換されます。null
を生成することはできません。AsyncGeneratorFunction
は読み取り/書き込み可能な変換Duplex
に変換されます。最初の引数としてソースAsyncIterable
を取らなければなりません。null
を生成することはできません。AsyncFunction
は書き込み可能なDuplex
に変換されます。null
またはundefined
を返す必要があります。
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'
// AsyncIterableを、読み取り可能なDuplexに変換します。
const s1 = compose(
(async function* () {
yield 'Hello'
yield 'World'
})()
)
// AsyncGeneratorを、変換Duplexに変換します。
const s2 = compose(async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
})
let res = ''
// AsyncFunctionを、書き込み可能なDuplexに変換します。
const s3 = compose(async function (source) {
for await (const chunk of source) {
res += chunk
}
})
await finished(compose(s1, s2, s3))
console.log(res) // 'HELLOWORLD' と出力
演算子としてのstream.compose
については、readable.compose(stream)
を参照してください。
stream.Readable.from(iterable[, options])
追加: v12.3.0, v10.17.0
iterable
<Iterable>Symbol.asyncIterator
またはSymbol.iterator
イテラブルプロトコルを実装するオブジェクト。null
値が渡された場合、「error」イベントを発生させます。options
<Object>new stream.Readable([options])
に提供されるオプション。デフォルトでは、Readable.from()
はoptions.objectMode
をtrue
に設定しますが、options.objectMode
をfalse
に設定することで明示的にオプトアウトできます。- 戻り値: <stream.Readable>
イテレータから読み込みストリームを作成するためのユーティリティメソッド。
const { Readable } = require('node:stream')
async function* generate() {
yield 'hello'
yield 'streams'
}
const readable = Readable.from(generate())
readable.on('data', chunk => {
console.log(chunk)
})
パフォーマンス上の理由から、Readable.from(string)
またはReadable.from(buffer)
を呼び出しても、文字列やバッファは他のストリームのセマンティクスと一致するようにイテレートされません。
Promise を含むIterable
オブジェクトが引数として渡された場合、未処理の拒否につながる可能性があります。
const { Readable } = require('node:stream')
Readable.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // 未処理の拒否
])
stream.Readable.fromWeb(readableStream[, options])
追加: v17.0.0
readableStream
<ReadableStream>options
<Object>encoding
<string>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
戻り値: <stream.Readable>
stream.Readable.isDisturbed(stream)
追加日: v16.8.0
stream
<stream.Readable> | <ReadableStream>- 戻り値:
boolean
ストリームが読み取られたか、キャンセルされたかどうかを返します。
stream.isErrored(stream)
追加日: v17.3.0, v16.14.0
stream
<Readable> | <Writable> | <Duplex> | <WritableStream> | <ReadableStream>- 戻り値: <boolean>
ストリームでエラーが発生したかどうかを返します。
stream.isReadable(stream)
追加日: v17.4.0, v16.14.0
stream
<Readable> | <Duplex> | <ReadableStream>- 戻り値: <boolean>
ストリームが読み取り可能かどうかを返します。
stream.Readable.toWeb(streamReadable[, options])
追加日: v17.0.0
streamReadable
<stream.Readable>options
<Object>strategy
<Object>highWaterMark
<number> 与えられたstream.Readable
からの読み取りにおいて、バックプレッシャーが適用される前に、(作成されたReadableStream
の)内部キューの最大サイズ。値が指定されていない場合、与えられたstream.Readable
から取得されます。size
<Function> 与えられたデータチャンクのサイズを計算する関数。値が指定されていない場合、すべてのチャンクのサイズは1
になります。chunk
<any>- 戻り値: <number>
- 戻り値: <ReadableStream>
stream.Writable.fromWeb(writableStream[, options])
追加日時: v17.0.0
writableStream
<WritableStream>options
<Object>decodeStrings
<boolean>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
戻り値: <stream.Writable>
stream.Writable.toWeb(streamWritable)
追加日時: v17.0.0
streamWritable
<stream.Writable>- 戻り値: <WritableStream>
stream.Duplex.from(src)
[履歴]
バージョン | 変更点 |
---|---|
v19.5.0, v18.17.0 | src 引数に ReadableStream または WritableStream を使用できるようになりました。 |
v16.8.0 | 追加日時: v16.8.0 |
src
<Stream> | <Blob> | <ArrayBuffer> | <string> | <Iterable> | <AsyncIterable> | <AsyncGeneratorFunction> | <AsyncFunction> | <Promise> | <Object> | <ReadableStream> | <WritableStream>
デュプレックスストリームを作成するためのユーティリティメソッドです。
Stream
は、書き込みストリームを書き込み可能なDuplex
に、読み込みストリームをDuplex
に変換します。Blob
は読み込み可能なDuplex
に変換されます。string
は読み込み可能なDuplex
に変換されます。ArrayBuffer
は読み込み可能なDuplex
に変換されます。AsyncIterable
は読み込み可能なDuplex
に変換されます。null
を生成することはできません。AsyncGeneratorFunction
は読み込み/書き込み可能な変換Duplex
に変換されます。最初の引数としてソースAsyncIterable
を取る必要があります。null
を生成することはできません。AsyncFunction
は書き込み可能なDuplex
に変換されます。null
またはundefined
を返す必要があります。Object ({ writable, readable })
はreadable
とwritable
をStream
に変換し、それらをDuplex
に結合します。Duplex
はwritable
に書き込み、readable
から読み取ります。Promise
は読み込み可能なDuplex
に変換されます。値null
は無視されます。ReadableStream
は読み込み可能なDuplex
に変換されます。WritableStream
は書き込み可能なDuplex
に変換されます。- 戻り値: <stream.Duplex>
Promise を含む Iterable
オブジェクトを引数として渡すと、未処理の拒否が発生する可能性があります。
const { Duplex } = require('node:stream')
Duplex.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // 未処理の拒否
])
stream.Duplex.fromWeb(pair[, options])
追加バージョン: v17.0.0
pair
<Object>readable
<ReadableStream>writable
<WritableStream>
options
<Object>戻り値: <stream.Duplex>
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
for await (const chunk of duplex) {
console.log('readable', chunk)
}
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))
stream.Duplex.toWeb(streamDuplex)
追加日時: v17.0.0
streamDuplex
<stream.Duplex>- 戻り値: <Object>
readable
<ReadableStream>writable
<WritableStream>
import { Duplex } from 'node:stream'
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
const { value } = await readable.getReader().read()
console.log('readable', value)
const { Duplex } = require('node:stream')
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
readable
.getReader()
.read()
.then(result => {
console.log('readable', result.value)
})
stream.addAbortSignal(signal, stream)
[履歴]
バージョン | 変更点 |
---|---|
v19.7.0, v18.16.0 | ReadableStream とWritableStream のサポートを追加 |
v15.4.0 | 追加日時: v15.4.0 |
signal
<AbortSignal> キャンセルの可能性を表すシグナルstream
<Stream> | <ReadableStream> | <WritableStream> シグナルをアタッチするストリーム。
読み取り可能または書き込み可能なストリームに AbortSignal をアタッチします。これにより、AbortController
を使用してストリームの破棄を制御できます。
渡されたAbortSignal
に対応するAbortController
でabort
を呼び出すと、ストリームで.destroy(new AbortError())
を呼び出した場合と同様に動作し、web ストリームの場合はcontroller.error(new AbortError())
となります。
const fs = require('node:fs')
const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// 後で、操作を中止してストリームを閉じます
controller.abort()
または、読み取り可能なストリームを非同期イテラブルとして使用するAbortSignal
を使用します。
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // タイムアウトを設定
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
try {
for await (const chunk of stream) {
await process(chunk)
}
} catch (e) {
if (e.name === 'AbortError') {
// 操作がキャンセルされました
} else {
throw e
}
}
})()
または、AbortSignal
をReadableStream
で使用します。
const controller = new AbortController()
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello')
controller.enqueue('world')
controller.close()
},
})
addAbortSignal(controller.signal, rs)
finished(rs, err => {
if (err) {
if (err.name === 'AbortError') {
// 操作がキャンセルされました
}
}
})
const reader = rs.getReader()
reader.read().then(({ value, done }) => {
console.log(value) // hello
console.log(done) // false
controller.abort()
})
stream.getDefaultHighWaterMark(objectMode)
追加: v19.9.0, v18.17.0
ストリームで使用されるデフォルトの highWaterMark を返します。デフォルトは65536
(64 KiB)で、objectMode
の場合は16
です。
stream.setDefaultHighWaterMark(objectMode, value)
追加: v19.9.0, v18.17.0
ストリームで使用されるデフォルトの highWaterMark を設定します。
ストリーム実装者向け API
node:stream
モジュールの API は、JavaScript のプロトタイプ継承モデルを使用してストリームを容易に実装できるように設計されています。
まず、ストリーム開発者は、4 つの基本的なストリームクラス(stream.Writable
, stream.Readable
, stream.Duplex
, またはstream.Transform
)のいずれかを拡張する新しい JavaScript クラスを宣言し、適切な親クラスコンストラクタを呼び出していることを確認する必要があります。
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor({ highWaterMark, ...options }) {
super({ highWaterMark })
// ...
}
}
ストリームを拡張する際には、ユーザーが提供できるオプションと、基本コンストラクタに渡す前にユーザーが提供する必要があるオプションを考慮してください。例えば、実装がautoDestroy
とemitClose
オプションに関して仮定を行う場合、ユーザーがこれらをオーバーライドすることを許可しないでください。すべてのオプションを暗黙的に転送するのではなく、転送されるオプションを明示的に指定してください。
新しいストリームクラスは、作成されるストリームの種類に応じて、以下の表に示すように、1 つ以上の特定のメソッドを実装する必要があります。
ユースケース | クラス | 実装するメソッド |
---|---|---|
読み取り専用 | Readable | _read() |
書き込み専用 | Writable | _write() , _writev() , _final() |
読み取りと書き込み | Duplex | _read() , _write() , _writev() , _final() |
書き込まれたデータに対して操作を行い、その結果を読み取る | Transform | _transform() , _flush() , _final() |
ストリームの実装コードは、ストリームのコンシューマが使用するように意図されたストリームの「パブリック」メソッド(ストリームコンシューマ向け APIセクションで説明されている)を決して呼び出してはいけません。そうすると、ストリームを消費するアプリケーションコードに悪影響を与える可能性があります。
write()
、end()
、cork()
、uncork()
、read()
、destroy()
などのパブリックメソッドをオーバーライドしたり、.emit()
を使用して'error'
、'data'
、'end'
、'finish'
、'close'
などの内部イベントを発生させたりしないでください。これを行うと、現在のストリームと将来のストリームの不変性が壊れ、他のストリーム、ストリームユーティリティ、およびユーザーの期待との動作や互換性の問題につながる可能性があります。
簡素化された構築
追加バージョン: v1.2.0
多くの単純なケースでは、継承に頼らずにストリームを作成できます。これは、stream.Writable
、stream.Readable
、stream.Duplex
、またはstream.Transform
オブジェクトのインスタンスを直接作成し、適切なメソッドをコンストラクタオプションとして渡すことで実現できます。
const { Writable } = require('node:stream')
const myWritable = new Writable({
construct(callback) {
// 状態の初期化とリソースの読み込み...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// リソースの解放...
},
})
書き込み可能ストリームの実装
stream.Writable
クラスは、Writable
ストリームを実装するために拡張されます。
カスタムWritable
ストリームは、new stream.Writable([options])
コンストラクタを呼び出し、writable._write()
メソッドおよび/またはwritable._writev()
メソッドを実装する必要があります。
new stream.Writable([options])
[履歴]
バージョン | 変更 |
---|---|
v22.0.0 | デフォルトの highWaterMark を増加 |
v15.5.0 | AbortSignal の受け渡しをサポート |
v14.0.0 | autoDestroy オプションのデフォルトをtrue に変更 |
v11.2.0, v10.16.0 | ストリームが'finish' を emit したりエラーが発生したときに自動的にストリームをdestroy() するautoDestroy オプションを追加 |
v10.0.0 | destroy 時に'close' を emit するかどうかを指定するemitClose オプションを追加 |
options
<Object>highWaterMark
<number>stream.write()
がfalse
を返し始めるバッファレベル。デフォルト:65536
(64 KiB)、objectMode
ストリームの場合は16
。decodeStrings
<boolean>stream.write()
に渡されたstring
をstream.write()
呼び出しで指定されたエンコーディングを使用してBuffer
にエンコードするかどうか。他の種類のデータは変換されません(つまり、Buffer
はstring
にデコードされません)。false
に設定すると、string
の変換が行われなくなります。デフォルト:true
。defaultEncoding
<string>stream.write()
に引数としてエンコーディングが指定されていない場合に使用されるデフォルトのエンコーディング。デフォルト:'utf8'
。objectMode
<boolean>stream.write(anyObj)
が有効な操作かどうか。設定されると、ストリームの実装でサポートされている場合、文字列、<Buffer>、<TypedArray>、<DataView>以外の JavaScript 値を書き込むことが可能になります。デフォルト:false
。emitClose
<boolean> ストリームが破棄された後に'close'
を emit するかどうか。デフォルト:true
。write
<Function>stream._write()
メソッドの実装。writev
<Function>stream._writev()
メソッドの実装。destroy
<Function>stream._destroy()
メソッドの実装。final
<Function>stream._final()
メソッドの実装。construct
<Function>stream._construct()
メソッドの実装。autoDestroy
<boolean> ストリームが終了後に自動的に自身で.destroy()
を呼び出すかどうか。デフォルト:true
。signal
<AbortSignal> 可能性のあるキャンセルを表すシグナル。
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor(options) {
// stream.Writable()コンストラクタを呼び出します。
super(options)
// ...
}
}
または、ES6 以前のスタイルのコンストラクタを使用する場合:
const { Writable } = require('node:stream')
const util = require('node:util')
function MyWritable(options) {
if (!(this instanceof MyWritable)) return new MyWritable(options)
Writable.call(this, options)
}
util.inherits(MyWritable, Writable)
または、簡素化されたコンストラクタアプローチを使用する場合:
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
})
渡されたAbortSignal
に対応するAbortController
でabort
を呼び出すと、書き込み可能ストリームで.destroy(new AbortError())
を呼び出した場合と同じ動作になります。
const { Writable } = require('node:stream')
const controller = new AbortController()
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal,
})
// 後で、操作を中止してストリームを閉じます
controller.abort()
writable._construct(callback)
追加日時: v15.0.0
callback
<Function> ストリームの初期化が完了したときに、(エラー引数をオプションで指定して)この関数を呼び出します。
_construct()
メソッドは、直接呼び出してはなりません。子クラスで実装できる場合があり、その場合は内部のWritable
クラスメソッドによってのみ呼び出されます。
このオプションの関数は、ストリームコンストラクタが返された後のティックで呼び出され、callback
が呼び出されるまで、_write()
、_final()
、_destroy()
の呼び出しを遅延させます。これは、ストリームを使用する前に状態を初期化したり、リソースを非同期的に初期化したりする場合に役立ちます。
const { Writable } = require('node:stream')
const fs = require('node:fs')
class WriteStream extends Writable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, 'w', (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback)
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
writable._write(chunk, encoding, callback)
[履歴]
バージョン | 変更点 |
---|---|
v12.11.0 | _writev() を提供する場合、_write() はオプションになります。 |
chunk
<Buffer> | <string> | <any> 書き込むBuffer
。stream.write()
に渡されたstring
から変換されます。ストリームのdecodeStrings
オプションがfalse
の場合、またはストリームがオブジェクトモードで動作している場合、チャンクは変換されず、stream.write()
に渡されたものになります。encoding
<string> チャンクが文字列の場合、encoding
はその文字列の文字エンコーディングです。チャンクがBuffer
の場合、またはストリームがオブジェクトモードで動作している場合、encoding
は無視される場合があります。callback
<Function> 指定されたチャンクの処理が完了したときに、(エラー引数をオプションで指定して)この関数を呼び出します。
すべてのWritable
ストリーム実装は、基になるリソースにデータを送信するために、writable._write()
と/またはwritable._writev()
メソッドを提供する必要があります。
Transform
ストリームは、writable._write()
の独自のインプリメンテーションを提供します。
この関数は、アプリケーションコードによって直接呼び出してはなりません。子クラスで実装する必要があり、内部のWritable
クラスメソッドによってのみ呼び出されます。
callback
関数は、writable._write()
内で同期的に、または非同期的に(つまり、別のティックで)呼び出して、書き込みが正常に完了したか、エラーで失敗したかを信号で伝える必要があります。callback
に渡される最初の引数は、呼び出しが失敗した場合はError
オブジェクト、書き込みが成功した場合はnull
でなければなりません。
writable._write()
が呼び出されてからcallback
が呼び出されるまでの間に発生するwritable.write()
へのすべての呼び出しにより、書き込まれたデータがバッファリングされます。callback
が呼び出されると、ストリームは'drain'
イベントを発行することがあります。ストリーム実装が一度に複数のチャンクデータを処理できる場合は、writable._writev()
メソッドを実装する必要があります。
decodeStrings
プロパティがコンストラクタオプションで明示的にfalse
に設定されている場合、chunk
は.write()
に渡されたオブジェクトのままになり、Buffer
ではなく文字列になる可能性があります。これは、特定の文字列データエンコーディングに対して最適化された処理を持つ実装をサポートするためです。その場合、encoding
引数は文字列の文字エンコーディングを示します。それ以外の場合は、encoding
引数は安全に無視できます。
writable._write()
メソッドは、アンダースコアで始まるため、それを定義するクラスに対して内部であり、ユーザープログラムによって直接呼び出してはならないことを示しています。
writable._writev(chunks, callback)
chunks
<Object[]> 書き込むデータ。書き込む個別のデータチャンクを表す <Object> の配列です。これらのオブジェクトのプロパティは以下の通りです。callback
<Function> 指定されたチャンクの処理が完了したときに呼び出されるコールバック関数(オプションでエラー引数付き)。
この関数は、アプリケーションコードによって直接呼び出してはなりません。子クラスで実装し、内部の Writable
クラスメソッドのみによって呼び出される必要があります。
writable._writev()
メソッドは、一度に複数のデータチャンクを処理できるストリーム実装において、writable._write()
に加えて、または代わりに実装できます。実装されていて、前の書き込みからのバッファリングされたデータがある場合、_write()
の代わりに _writev()
が呼び出されます。
writable._writev()
メソッドは、アンダースコアで始まるため、それを定義するクラスの内部であり、ユーザープログラムから直接呼び出されるべきではありません。
writable._destroy(err, callback)
追加日時: v8.0.0
err
<Error> エラーの可能性。callback
<Function> オプションのエラー引数をとるコールバック関数。
_destroy()
メソッドは、writable.destroy()
によって呼び出されます。子クラスでオーバーライドできますが、直接呼び出してはなりません。
writable._final(callback)
追加日時: v8.0.0
callback
<Function> 残りデータの書き込みが完了したら、この関数(オプションでエラー引数付き)を呼び出します。
_final()
メソッドは、直接呼び出してはなりません。子クラスで実装できる場合があり、その場合は内部のWritable
クラスメソッドによってのみ呼び出されます。
このオプションの関数は、ストリームが閉じる前に呼び出され、callback
が呼び出されるまで'finish'
イベントが遅延されます。これは、ストリームが終了する前にリソースを閉じたり、バッファされたデータを書いたりするのに役立ちます。
書き込み中のエラー
writable._write()
、writable._writev()
、およびwritable._final()
メソッドの処理中に発生したエラーは、コールバックを呼び出してエラーを最初の引数として渡すことで伝播する必要があります。これらのメソッド内からError
をスローしたり、'error'
イベントを手動で発行したりすると、動作が未定義になります。
Writable
がエラーを発行したときにReadable
ストリームがWritable
ストリームにパイプされている場合、Readable
ストリームはアンパイプされます。
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
},
})
書き込み可能ストリームの例
以下は、かなり単純化された(そしてやや無意味な)カスタムWritable
ストリーム実装を示しています。この特定のWritable
ストリームインスタンスは、実際には特に有用ではありませんが、この例は、カスタムWritable
ストリームインスタンスに必要な要素をそれぞれ示しています。
const { Writable } = require('node:stream')
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
}
}
書き込み可能ストリームでのバッファのデコード
バッファのデコードは、入力が文字列であるトランスフォーマを使用する場合など、一般的なタスクです。UTF-8 などのマルチバイト文字エンコーディングを使用する場合、これは簡単なプロセスではありません。次の例は、StringDecoder
とWritable
を使用してマルチバイト文字列をデコードする方法を示しています。
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')
class StringWritable extends Writable {
constructor(options) {
super(options)
this._decoder = new StringDecoder(options?.defaultEncoding)
this.data = ''
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk)
}
this.data += chunk
callback()
}
_final(callback) {
this.data += this._decoder.end()
callback()
}
}
const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()
w.write('currency: ')
w.write(euro[0])
w.end(euro[1])
console.log(w.data) // currency: €
読み込み可能ストリームの実装
Readable
ストリームを実装するには、stream.Readable
クラスを拡張します。
カスタムReadable
ストリームは、new stream.Readable([options])
コンストラクタを呼び出し、readable._read()
メソッドを実装する必要があります。
new stream.Readable([options])
[履歴]
バージョン | 変更 |
---|---|
v22.0.0 | デフォルトの highWaterMark を増加 |
v15.5.0 | AbortSignal の受け渡しをサポート |
v14.0.0 | autoDestroy オプションのデフォルトをtrue に変更 |
v11.2.0, v10.16.0 | 'end' またはエラーが発生したときにストリームを自動的にdestroy() するautoDestroy オプションを追加 |
options
<Object>highWaterMark
<number> 基になるリソースからの読み取りを停止する前に、内部バッファに格納する最大バイト数。デフォルト:65536
(64 KiB)、またはobjectMode
ストリームの場合は16
。encoding
<string> 指定した場合、バッファは指定されたエンコーディングを使用して文字列にデコードされます。デフォルト:null
。objectMode
<boolean> このストリームがオブジェクトのストリームとして動作するかどうか。つまり、stream.read(n)
は、n
サイズのBuffer
ではなく、単一の値を返します。デフォルト:false
。emitClose
<boolean> ストリームが破棄された後に'close'
を emit するかどうか。デフォルト:true
。read
<Function>stream._read()
メソッドの実装。destroy
<Function>stream._destroy()
メソッドの実装。construct
<Function>stream._construct()
メソッドの実装。autoDestroy
<boolean> 終了後にこのストリームが自動的に.destroy()
を自身に呼び出すかどうか。デフォルト:true
。signal
<AbortSignal> 可能性のあるキャンセルを表すシグナル。
const { Readable } = require('node:stream')
class MyReadable extends Readable {
constructor(options) {
// stream.Readable(options)コンストラクタを呼び出します。
super(options)
// ...
}
}
または、ES6 以前のスタイルのコンストラクタを使用する場合:
const { Readable } = require('node:stream')
const util = require('node:util')
function MyReadable(options) {
if (!(this instanceof MyReadable)) return new MyReadable(options)
Readable.call(this, options)
}
util.inherits(MyReadable, Readable)
または、簡略化されたコンストラクタアプローチを使用する場合:
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
// ...
},
})
渡されたAbortSignal
に対応するAbortController
でabort
を呼び出すと、作成された読み込み可能オブジェクトで.destroy(new AbortError())
を呼び出した場合と同じ動作になります。
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
read(size) {
// ...
},
signal: controller.signal,
})
// 後で、操作を中止してストリームを閉じます
controller.abort()
readable._construct(callback)
追加日時: v15.0.0
callback
<Function> ストリームの初期化が完了した際に、(エラー引数をオプションで指定して)この関数を呼び出します。
_construct()
メソッドは、直接呼び出してはいけません。子クラスで実装できる場合があり、その場合は内部のReadable
クラスのメソッドによってのみ呼び出されます。
このオプション関数は、ストリームコンストラクタによって次のティックでスケジュールされ、callback
が呼び出されるまで、すべての_read()
と_destroy()
の呼び出しを遅延させます。これは、ストリームを使用する前に状態を初期化したり、非同期的にリソースを初期化したりする場合に便利です。
const { Readable } = require('node:stream')
const fs = require('node:fs')
class ReadStream extends Readable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_read(n) {
const buf = Buffer.alloc(n)
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err)
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
}
})
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
readable._read(size)
追加日時: v0.9.4
size
<number> 非同期で読み取るバイト数
この関数は、アプリケーションコードから直接呼び出してはいけません。子クラスで実装する必要があり、内部のReadable
クラスのメソッドによってのみ呼び出されます。
すべてのReadable
ストリーム実装は、基になるリソースからデータを取得するために、readable._read()
メソッドの実装を提供する必要があります。
readable._read()
が呼び出された場合、リソースからデータが利用可能であれば、実装はthis.push(dataChunk)
メソッドを使用して、そのデータを読み込みキューにプッシュし始める必要があります。ストリームがさらにデータを受け入れる準備ができると、this.push(dataChunk)
を呼び出すたびに_read()
が再び呼び出されます。readable.push()
がfalse
を返すまで、_read()
はリソースからの読み取りとデータのプッシュを続けることができます。_read()
が停止した後に再度呼び出された場合にのみ、追加のデータをキューにプッシュを再開する必要があります。
readable._read()
メソッドが呼び出されると、readable.push()
メソッドを介してさらにデータがプッシュされるまで、再び呼び出されることはありません。空のバッファや文字列などの空のデータは、readable._read()
を呼び出す原因にはなりません。
size
引数は助言的なものです。「読み取り」がデータが返される単一操作である実装では、size
引数を使用して、取得するデータの量を決定できます。他の実装では、この引数を無視し、データが利用可能になったときにデータを提供するだけです。stream.push(chunk)
を呼び出す前にsize
バイトが利用可能になるまで「待つ」必要はありません。
readable._read()
メソッドは、アンダースコアで始まるプレフィックスが付いているのは、それを定義するクラスの内側にあり、ユーザープログラムから直接呼び出されるべきではないためです。
readable._destroy(err, callback)
追加日時: v8.0.0
err
<Error> エラーの可能性。callback
<Function> オプションのエラー引数をとるコールバック関数。
_destroy()
メソッドは readable.destroy()
によって呼び出されます。子クラスで上書きできますが、直接呼び出してはなりません。
readable.push(chunk[, encoding])
[履歴]
バージョン | 変更点 |
---|---|
v22.0.0, v20.13.0 | chunk 引数に TypedArray または DataView インスタンスを使用できるようになりました。 |
v8.0.0 | chunk 引数に Uint8Array インスタンスを使用できるようになりました。 |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> 読み込みキューにプッシュするデータのチャンク。オブジェクトモードで動作していないストリームの場合、chunk
は <string>、<Buffer>、<TypedArray>、または<DataView> でなければなりません。オブジェクトモードストリームの場合、chunk
は任意の JavaScript 値にすることができます。encoding
<string> 文字列チャンクのエンコーディング。'utf8'
や'ascii'
などの有効なBuffer
エンコーディングである必要があります。- 戻り値: <boolean> さらにデータチャンクをプッシュできる場合は
true
、そうでない場合はfalse
。
chunk
が <Buffer>、<TypedArray>、<DataView>、または<string> の場合、データのchunk
はストリームのユーザーが消費するために内部キューに追加されます。chunk
を null
として渡すと、ストリームの終わり (EOF) を示し、それ以降はデータを追加できなくなります。
Readable
が一時停止モードで動作している場合、readable.push()
で追加されたデータは、'readable'
イベントが発行されたときに readable.read()
メソッドを呼び出すことで読み取ることができます。
Readable
がフローイングモードで動作している場合、readable.push()
で追加されたデータは 'data'
イベントの発行によって配信されます。
readable.push()
メソッドは可能な限り柔軟になるように設計されています。たとえば、何らかのポーズ/再開メカニズムとデータコールバックを提供する下位レベルのソースをラップする場合、下位レベルのソースはカスタム Readable
インスタンスでラップできます。
// `_source` は、readStop() と readStart() メソッド、
// データがあるときに呼び出される `ondata` メンバー、
// データが終了したときに呼び出される `onend` メンバーを持つオブジェクトです。
class SourceWrapper extends Readable {
constructor(options) {
super(options)
this._source = getLowLevelSourceObject()
// データがあるたびに、それを内部バッファーにプッシュします。
this._source.ondata = chunk => {
// push() が false を返す場合、ソースからの読み取りを停止します。
if (!this.push(chunk)) this._source.readStop()
}
// ソースが終了したら、EOF を示す `null` チャンクをプッシュします。
this._source.onend = () => {
this.push(null)
}
}
// _read() は、ストリームがより多くのデータを引き込みたいときに呼び出されます。
// この場合、アドバイザリサイズの引数は無視されます。
_read(size) {
this._source.readStart()
}
}
readable.push()
メソッドは、コンテンツを内部バッファーにプッシュするために使用されます。これは、readable._read()
メソッドによって駆動できます。
オブジェクトモードで動作していないストリームの場合、readable.push()
の chunk
パラメーターが undefined
の場合、空の文字列またはバッファーとして扱われます。詳細は readable.push('')
を参照してください。
読み込み中のエラー
readable._read()
の処理中に発生するエラーは、readable.destroy(err)
メソッドを通じて伝播する必要があります。readable._read()
内で Error
をスローしたり、'error'
イベントを手動で emit したりすると、動作が未定義になります。
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
const err = checkSomeErrorCondition()
if (err) {
this.destroy(err)
} else {
// Do some work.
}
},
})
カウンターストリームの例
以下は、1 から 1,000,000 までの数字を昇順に emit し、その後終了する Readable
ストリームの基本的な例です。
const { Readable } = require('node:stream')
class Counter extends Readable {
constructor(opt) {
super(opt)
this._max = 1000000
this._index = 1
}
_read() {
const i = this._index++
if (i > this._max) this.push(null)
else {
const str = String(i)
const buf = Buffer.from(str, 'ascii')
this.push(buf)
}
}
}
デュプレックスストリームの実装
Duplex
ストリームは、TCP ソケット接続など、Readable
と Writable
の両方を実装するストリームです。
JavaScript は多重継承をサポートしていないため、Duplex
ストリームを実装するには(stream.Readable
と stream.Writable
クラスを拡張するのではなく)、stream.Duplex
クラスを拡張します。
stream.Duplex
クラスはプロトタイプチェーンで stream.Readable
から継承し、パラサイト的に stream.Writable
から継承しますが、stream.Writable
上で Symbol.hasInstance
をオーバーライドするため、両方の基本クラスに対して instanceof
は正しく機能します。
カスタム Duplex
ストリームは、new stream.Duplex([options])
コンストラクタを呼び出し、readable._read()
メソッドと writable._write()
メソッドの 両方 を実装する 必要があります。
new stream.Duplex(options)
[履歴]
バージョン | 変更 |
---|---|
v8.4.0 | readableHighWaterMark および writableHighWaterMark オプションがサポートされました。 |
options
<Object>Writable
およびReadable
コンストラクタの両方に渡されます。以下のフィールドも持ちます。allowHalfOpen
<boolean>false
に設定されている場合、ストリームは読み込み側が終了したときに書き込み側を自動的に終了します。デフォルト:true
。readable
<boolean>Duplex
が読み込み可能かどうかを設定します。デフォルト:true
。writable
<boolean>Duplex
が書き込み可能かどうかを設定します。デフォルト:true
。readableObjectMode
<boolean> ストリームの読み込み側のobjectMode
を設定します。objectMode
がtrue
の場合、効果はありません。デフォルト:false
。writableObjectMode
<boolean> ストリームの書き込み側のobjectMode
を設定します。objectMode
がtrue
の場合、効果はありません。デフォルト:false
。readableHighWaterMark
<number> ストリームの読み込み側のhighWaterMark
を設定します。highWaterMark
が提供されている場合、効果はありません。writableHighWaterMark
<number> ストリームの書き込み側のhighWaterMark
を設定します。highWaterMark
が提供されている場合、効果はありません。
const { Duplex } = require('node:stream')
class MyDuplex extends Duplex {
constructor(options) {
super(options)
// ...
}
}
または、ES6 以前スタイルのコンストラクタを使用する場合:
const { Duplex } = require('node:stream')
const util = require('node:util')
function MyDuplex(options) {
if (!(this instanceof MyDuplex)) return new MyDuplex(options)
Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)
または、簡略化されたコンストラクタアプローチを使用する場合:
const { Duplex } = require('node:stream')
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
},
})
パイプラインを使用する場合:
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')
pipeline(
fs.createReadStream('object.json').setEncoding('utf8'),
new Transform({
decodeStrings: false, // バッファではなく文字列入力を許可
construct(callback) {
this.data = ''
callback()
},
transform(chunk, encoding, callback) {
this.data += chunk
callback()
},
flush(callback) {
try {
// 有効なJSONであることを確認します。
JSON.parse(this.data)
this.push(this.data)
callback()
} catch (err) {
callback(err)
}
},
}),
fs.createWriteStream('valid-object.json'),
err => {
if (err) {
console.error('failed', err)
} else {
console.log('completed')
}
}
)
デュプレックスストリームの例
以下は、データの書き込みが可能で、データの読み込みも可能な仮の低レベルソースオブジェクトをラップするDuplex
ストリームの簡単な例を示しています。ただし、Node.js ストリームと互換性のない API を使用しています。以下は、Writable
インターフェースを介して受信した書き込みデータをバッファリングし、Readable
インターフェースを介して読み戻すDuplex
ストリームの簡単な例を示しています。
const { Duplex } = require('node:stream')
const kSource = Symbol('source')
class MyDuplex extends Duplex {
constructor(source, options) {
super(options)
this[kSource] = source
}
_write(chunk, encoding, callback) {
// 基になるソースは文字列のみを処理します。
if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
this[kSource].writeSomeData(chunk)
callback()
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding))
})
}
}
Duplex
ストリームの最も重要な側面は、Readable
側とWritable
側が、単一のオブジェクトインスタンス内に共存しているにもかかわらず、互いに独立して動作することです。
オブジェクトモードのデュプレックスストリーム
Duplex
ストリームの場合、objectMode
は、それぞれreadableObjectMode
およびwritableObjectMode
オプションを使用して、Readable
側またはWritable
側のどちらか一方にのみ排他的に設定できます。
たとえば、次の例では、オブジェクトモードのWritable
側を持ち、JavaScript の数値を受け取り、Readable
側で 16 進数文字列に変換する新しいTransform
ストリーム(Duplex
ストリームの一種)が作成されます。
const { Transform } = require('node:stream')
// すべてのTransformストリームはDuplexストリームでもあります。
const myTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
// 必要に応じてチャンクを数値に変換します。
chunk |= 0
// チャンクを他のものに変換します。
const data = chunk.toString(16)
// データを読み取り可能なキューにプッシュします。
callback(null, '0'.repeat(data.length % 2) + data)
},
})
myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))
myTransform.write(1)
// Prints: 01
myTransform.write(10)
// Prints: 0a
myTransform.write(100)
// Prints: 64
トランスフォームストリームの実装
Transform
ストリームは、入力を基にして出力が計算されるDuplex
ストリームです。例としては、データを圧縮、暗号化、または復号化するzlib ストリームやcrypto ストリームなどがあります。
出力が入力と同じサイズ、同じチャンク数、または同時に到着する必要はありません。たとえば、Hash
ストリームは、入力の終了時に提供される単一の出力チャンクしか持ちません。zlib
ストリームは、入力よりもはるかに小さく、またははるかに大きい出力を生成します。
stream.Transform
クラスを拡張して、Transform
ストリームを実装します。
stream.Transform
クラスはプロトタイプとして stream.Duplex
から継承し、独自の writable._write()
メソッドとreadable._read()
メソッドを実装します。カスタム Transform
実装では、transform._transform()
メソッドを実装する必要があり、transform._flush()
メソッドも実装できます。
Transform
ストリームを使用する際には、ストリームに書き込まれたデータによって、Readable
側の出力が消費されない場合、ストリームのWritable
側が一時停止される可能性があることに注意が必要です。
new stream.Transform([options])
options
<Object>Writable
とReadable
の両方のコンストラクタに渡されます。以下のフィールドも持っています。transform
<Function>stream._transform()
メソッドの実装。flush
<Function>stream._flush()
メソッドの実装。
const { Transform } = require('node:stream')
class MyTransform extends Transform {
constructor(options) {
super(options)
// ...
}
}
または、ES6 以前のスタイルのコンストラクタを使用する場合:
const { Transform } = require('node:stream')
const util = require('node:util')
function MyTransform(options) {
if (!(this instanceof MyTransform)) return new MyTransform(options)
Transform.call(this, options)
}
util.inherits(MyTransform, Transform)
または、簡略化されたコンストラクタアプローチを使用する場合:
const { Transform } = require('node:stream')
const myTransform = new Transform({
transform(chunk, encoding, callback) {
// ...
},
})
イベント: 'end'
'end'
イベントは stream.Readable
クラスからのものです。'end'
イベントは、すべてのデータが出力された後、transform._flush()
のコールバックが呼び出された後に発生します。エラーが発生した場合、'end'
は発生しません。
イベント: 'finish'
'finish'
イベントは stream.Writable
クラスからのものです。'finish'
イベントは、stream.end()
が呼び出され、すべてのチャンクが stream._transform()
によって処理された後に発生します。エラーが発生した場合、'finish'
は発生しません。
transform._flush(callback)
callback
<Function> 残りデータがフラッシュされたときに呼び出されるコールバック関数(オプションでエラー引数とデータを含む)。
この関数は、アプリケーションコードによって直接呼び出してはなりません。子クラスで実装する必要があり、内部の Readable
クラスメソッドによってのみ呼び出されるべきです。
場合によっては、変換操作でストリームの最後に追加のデータを出力する必要がある場合があります。たとえば、zlib
圧縮ストリームは、出力を最適に圧縮するために使用される内部状態の量を格納します。しかし、ストリームが終了すると、圧縮データが完全になるように、その追加データをフラッシュする必要があります。
カスタムの Transform
実装では、transform._flush()
メソッドを実装することができます。これは、消費される書き込みデータがなくなった後、'end'
イベントが Readable
ストリームの終了を示すシグナルを送信する前に呼び出されます。
transform._flush()
の実装内では、必要に応じて transform.push()
メソッドを 0 回以上呼び出すことができます。フラッシュ操作が完了したときに、callback
関数を呼び出す必要があります。
transform._flush()
メソッドにはアンダースコアが接頭辞として付けられているのは、それを定義するクラスに対して内部的なものであり、ユーザープログラムによって直接呼び出すべきではないためです。
transform._transform(chunk, encoding, callback)
chunk
<Buffer> | <string> | <any>stream.write()
に渡されたstring
から変換された、変換されるBuffer
。ストリームのdecodeStrings
オプションがfalse
であるか、ストリームがオブジェクトモードで動作している場合、チャンクは変換されず、stream.write()
に渡されたものになります。encoding
<string> チャンクが文字列の場合、エンコーディングの種類です。チャンクがバッファの場合、特別な値'buffer'
です。その場合は無視してください。callback
<Function> 指定されたchunk
が処理された後に呼び出されるコールバック関数(エラー引数とデータを含む場合もあります)。
この関数は、アプリケーションコードから直接呼び出してはいけません。子クラスで実装し、内部のReadable
クラスメソッドからのみ呼び出す必要があります。
すべてのTransform
ストリーム実装は、入力を受け入れて出力を生成する_transform()
メソッドを提供する必要があります。transform._transform()
実装は書き込まれているバイトを処理し、出力を計算してから、transform.push()
メソッドを使用して読み取り可能な部分に出力を渡します。
transform.push()
メソッドは、チャンクの結果としてどれだけ出力が生成されるかに応じて、単一入力チャンクから出力を生成するために 0 回以上呼び出すことができます。
入力データの特定のチャンクから出力が生成されない可能性があります。
callback
関数は、現在のチャンクが完全に消費された場合にのみ呼び出す必要があります。callback
に渡される最初の引数は、入力の処理中にエラーが発生した場合はError
オブジェクト、そうでない場合はnull
でなければなりません。callback
に 2 番目の引数が渡された場合、最初の引数が偽の場合にのみ、transform.push()
メソッドに転送されます。言い換えると、次のものは同等です。
transform.prototype._transform = function (data, encoding, callback) {
this.push(data)
callback()
}
transform.prototype._transform = function (data, encoding, callback) {
callback(null, data)
}
transform._transform()
メソッドは、それを定義するクラスの内部であり、ユーザープログラムから直接呼び出してはならないため、アンダースコアで接頭辞が付けられています。
transform._transform()
は決して並列に呼び出されません。ストリームはキューメカニズムを実装しており、次のチャンクを受け取るには、callback
を同期的にまたは非同期的に呼び出す必要があります。
クラス: stream.PassThrough
stream.PassThrough
クラスは、入力バイトを単に出力側に渡すだけの、Transform
ストリームの些細な実装です。その目的は主に例とテストのためですが、stream.PassThrough
を新しい種類のストリームの構成要素として役立てるユースケースもいくつかあります。
付加情報
ストリームと非同期ジェネレータおよび非同期イテレータの互換性
JavaScript における非同期ジェネレータとイテレータのサポートにより、非同期ジェネレータは事実上、この時点でファーストクラスの言語レベルのストリーム構成要素となっています。
Node.js ストリームと非同期ジェネレータおよび非同期イテレータを使用する一般的な相互運用事例を以下に示します。
非同期イテレータによる読み込み可能ストリームの消費
;(async function () {
for await (const chunk of readable) {
console.log(chunk)
}
})()
非同期イテレータは、未処理の破壊後のエラーを防ぐために、ストリームに永続的なエラーハンドラを登録します。
非同期ジェネレータによる読み込み可能ストリームの作成
Readable.from()
ユーティリティメソッドを使用して、非同期ジェネレータから Node.js 読み込み可能ストリームを作成できます。
const { Readable } = require('node:stream')
const ac = new AbortController()
const signal = ac.signal
async function* generate() {
yield 'a'
await someLongRunningFn({ signal })
yield 'b'
yield 'c'
}
const readable = Readable.from(generate())
readable.on('close', () => {
ac.abort()
})
readable.on('data', chunk => {
console.log(chunk)
})
非同期イテレータからの書き込み可能ストリームへのパイプ
非同期イテレータから書き込み可能ストリームに書き込む場合は、バックプレッシャーとエラーを正しく処理する必要があります。stream.pipeline()
は、バックプレッシャーとバックプレッシャー関連のエラーの処理を抽象化します。
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')
const writable = fs.createWriteStream('./file')
const ac = new AbortController()
const signal = ac.signal
const iterator = createIterator({ signal })
// コールバックパターン
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err)
} else {
console.log(value, 'value returned')
}
}).on('close', () => {
ac.abort()
})
// Promiseパターン
pipelinePromise(iterator, writable)
.then(value => {
console.log(value, 'value returned')
})
.catch(err => {
console.error(err)
ac.abort()
})
以前の Node.js バージョンとの互換性
Node.js 0.10 より前のバージョンでは、Readable
ストリームインターフェースはよりシンプルでしたが、機能も使い勝手も劣っていました。
stream.read()
メソッドへの呼び出しを待つのではなく、'data'
イベントはすぐに発生し始めます。データの処理方法を決定するために何らかの作業を行う必要があるアプリケーションでは、データが失われないように読み取ったデータをバッファに保存する必要がありました。stream.pause()
メソッドは、保証されたものではなく、助言的なものでした。これは、ストリームが一時停止状態になっている場合でも、'data'
イベントを受け取る準備をする必要があることを意味します。
Node.js 0.10 では、Readable
クラスが追加されました。以前の Node.js プログラムとの下位互換性のために、Readable
ストリームは、'data'
イベントハンドラが追加された場合、またはstream.resume()
メソッドが呼び出された場合に、「フローモード」に切り替わります。その結果、新しいstream.read()
メソッドと'readable'
イベントを使用していなくても、'data'
チャンクが失われる心配がなくなります。
ほとんどのアプリケーションは正常に動作し続けますが、次の条件下ではエッジケースが発生します。
'data'
イベントリスナーが追加されていない。stream.resume()
メソッドが呼び出されていない。- ストリームが書き込み可能な宛先にパイプされていない。
例えば、以下のコードを考えてみましょう。
// WARNING! BROKEN!
net
.createServer(socket => {
// We add an 'end' listener, but never consume the data.
socket.on('end', () => {
// It will never get here.
socket.end('The message was received but was not processed.\n')
})
})
.listen(1337)
Node.js 0.10 より前では、受信したメッセージデータは単に破棄されていました。しかし、Node.js 0.10 以降では、ソケットは永遠に一時停止状態のままになります。
この状況での回避策は、stream.resume()
メソッドを呼び出してデータのフローを開始することです。
// Workaround.
net
.createServer(socket => {
socket.on('end', () => {
socket.end('The message was received but was not processed.\n')
})
// Start the flow of data, discarding it.
socket.resume()
})
.listen(1337)
新しいReadable
ストリームがフローモードに切り替わることに加えて、0.10 以前のスタイルのストリームは、readable.wrap()
メソッドを使用してReadable
クラスにラップできます。
readable.read(0)
実際にはデータを使用せずに、基礎となる読み込み可能ストリームメカニズムの更新をトリガーする必要がある場合があります。そのような場合、readable.read(0)
を呼び出すことができます。これは常に null
を返します。
内部読み込みバッファが highWaterMark
より小さく、ストリームが現在読み込み中ではない場合、stream.read(0)
を呼び出すと、低レベルの stream._read()
呼び出しがトリガーされます。
ほとんどのアプリケーションではこれを行う必要はほとんどありませんが、特に Readable
ストリームクラスの内部で、Node.js 内でこれを行う状況があります。
readable.push('')
readable.push('')
の使用はお勧めしません。
オブジェクトモードではないストリームに、ゼロバイトの <string>、<Buffer>、<TypedArray>、または<DataView> をプッシュすると、興味深い副作用があります。readable.push()
への呼び出しであるため、読み込みプロセスが終了します。ただし、引数が空文字列であるため、読み込み可能バッファにデータが追加されないため、ユーザーが使用できるデータはありません。
readable.setEncoding()
呼び出し後の highWaterMark
の不一致
readable.setEncoding()
の使用は、オブジェクトモードではない場合の highWaterMark
の動作を変更します。
通常、現在のバッファのサイズはバイトで highWaterMark
と比較されます。しかし、setEncoding()
が呼び出された後、比較関数はバッファのサイズを文字で測定し始めます。
latin1
や ascii
を使用する一般的なケースでは問題ありません。しかし、マルチバイト文字を含む可能性のある文字列を扱う場合は、この動作に注意することをお勧めします。