Skip to content

ストリーム

[安定版: 2 - 安定版]

安定版: 2 安定度: 2 - 安定版

ソースコード: lib/stream.js

ストリームは、Node.js でストリーミングデータを取り扱うための抽象インターフェースです。node:stream モジュールは、ストリームインターフェースを実装するための API を提供します。

Node.js には多くのストリームオブジェクトが用意されています。例えば、HTTP サーバーへのリクエストprocess.stdoutはどちらもストリームインスタンスです。

ストリームは、読み取り可能、書き込み可能、またはその両方である場合があります。すべてのストリームはEventEmitterのインスタンスです。

node:stream モジュールにアクセスするには:

js
const stream = require('node:stream')

node:stream モジュールは、新しいタイプのストリームインスタンスを作成するのに役立ちます。ストリームを消費するためにnode:stream モジュールを使用する必要は通常ありません。

このドキュメントの構成

このドキュメントには、2 つの主要なセクションと、メモのための 3 番目のセクションが含まれています。最初のセクションでは、アプリケーション内で既存のストリームを使用する方法を説明します。2 番目のセクションでは、新しいタイプのストリームを作成する方法を説明します。

ストリームの種類

Node.js には 4 つの基本的なストリームタイプがあります。

さらに、このモジュールには、ユーティリティ関数stream.duplexPair()stream.pipeline()stream.finished()stream.Readable.from()、およびstream.addAbortSignal()が含まれています。

Streams Promises API

追加日時: v15.0.0

stream/promises API は、コールバックを使用するのではなくPromiseオブジェクトを返すストリームのための非同期ユーティリティ関数の代替セットを提供します。この API はrequire('node:stream/promises')またはrequire('node:stream').promisesからアクセスできます。

stream.pipeline(source[, ...transforms], destination[, options])

stream.pipeline(streams[, options])

[履歴]

バージョン変更
v18.0.0, v17.2.0, v16.14.0endオプションを追加。これはfalseに設定することで、ソースが終了したときにデスティネーションストリームを自動的に閉じないようにできます。
v15.0.0追加日時: v15.0.0
js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
  console.log('Pipeline succeeded.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'

await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('Pipeline succeeded.')

AbortSignalを使用するには、最後の引数としてオプションオブジェクト内に渡します。シグナルが中断されると、AbortErrorを使用して基になるパイプラインでdestroyが呼び出されます。

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  const ac = new AbortController()
  const signal = ac.signal

  setImmediate(() => ac.abort())
  await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
    signal,
  })
}

run().catch(console.error) // AbortError
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'

const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
  await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
  console.error(err) // AbortError
}

pipeline API は非同期ジェネレータもサポートします。

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')

async function run() {
  await pipeline(
    fs.createReadStream('lowercase.txt'),
    async function* (source, { signal }) {
      source.setEncoding('utf8') // `Buffer`ではなく文字列で処理します。
      for await (const chunk of source) {
        yield await processChunk(chunk, { signal })
      }
    },
    fs.createWriteStream('uppercase.txt')
  )
  console.log('Pipeline succeeded.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'

await pipeline(
  createReadStream('lowercase.txt'),
  async function* (source, { signal }) {
    source.setEncoding('utf8') // `Buffer`ではなく文字列で処理します。
    for await (const chunk of source) {
      yield await processChunk(chunk, { signal })
    }
  },
  createWriteStream('uppercase.txt')
)
console.log('Pipeline succeeded.')

非同期ジェネレータに渡されたsignal引数を処理することを忘れないでください。特に、非同期ジェネレータがパイプラインのソース(つまり、最初の引数)である場合、パイプラインは決して完了しません。

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')

async function run() {
  await pipeline(async function* ({ signal }) {
    await someLongRunningfn({ signal })
    yield 'asd'
  }, fs.createWriteStream('uppercase.txt'))
  console.log('Pipeline succeeded.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
  await someLongRunningfn({ signal })
  yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline succeeded.')

pipeline API はコールバックバージョンも提供します。

stream.finished(stream[, options])

[履歴]

バージョン変更
v19.5.0, v18.14.0ReadableStreamWritableStreamのサポートを追加
v19.1.0, v18.13.0cleanupオプションを追加
v15.0.0追加: v15.0.0
js
const { finished } = require('node:stream/promises')
const fs = require('node:fs')

const rs = fs.createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('Stream is done reading.')
}

run().catch(console.error)
rs.resume() // ストリームを排出します。
js
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'

const rs = createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('Stream is done reading.')
}

run().catch(console.error)
rs.resume() // ストリームを排出します。

finished API は、コールバックバージョンも提供します。

stream.finished()は、返されたプロミスが解決または拒否された後も、ぶら下がっているイベントリスナー(特に'error''end''finish''close')を残します。これは、(ストリームの実装が正しくないために)予期しない'error'イベントが予期しないクラッシュを引き起こさないようにするためです。これが望ましくない動作である場合は、options.cleanuptrueに設定する必要があります。

js
await finished(rs, { cleanup: true })

オブジェクトモード

Node.js API によって作成されたすべてのストリームは、文字列、<Buffer><TypedArray>、および<DataView>オブジェクトでのみ動作します。

  • StringsBuffersは、ストリームで使用される最も一般的なタイプです。
  • TypedArrayDataViewを使用すると、Int32ArrayUint8Arrayなどの型でバイナリデータを取り扱うことができます。TypedArrayまたはDataViewをストリームに書き込むと、Node.js は生のバイトを処理します。

ただし、ストリームの実装が他の種類の JavaScript 値(ストリーム内で特別な役割を果たすnullを除く)で動作することも可能です。このようなストリームは、「オブジェクトモード」で動作すると見なされます。

ストリームインスタンスは、ストリームの作成時にobjectModeオプションを使用してオブジェクトモードに切り替わります。既存のストリームをオブジェクトモードに切り替える試みは安全ではありません。

バッファリング

WritableReadableストリームの両方が、内部バッファにデータを格納します。

バッファされる可能性のあるデータの量は、ストリームのコンストラクタに渡されるhighWaterMarkオプションによって異なります。通常のストリームの場合、highWaterMarkオプションはバイトの総数を指定します。オブジェクトモードで動作するストリームの場合、highWaterMarkはオブジェクトの総数を指定します。文字列を操作する(ただしデコードしない)ストリームの場合、highWaterMarkは UTF-16 コード単位の総数を指定します。

データは、実装がstream.push(chunk)を呼び出したときにReadableストリームにバッファされます。ストリームのコンシューマがstream.read()を呼び出さない場合、データは消費されるまで内部キューに残ります。

内部読み込みバッファの合計サイズがhighWaterMarkで指定されたしきい値に達すると、現在バッファされているデータが消費されるまで(つまり、読み込みバッファを埋めるために使用される内部readable._read()メソッドの呼び出しをストリームが停止するまで)、ストリームは基になるリソースからのデータの読み取りを一時的に停止します。

writable.write(chunk)メソッドが繰り返し呼び出されると、データはWritableストリームにバッファされます。内部書き込みバッファの合計サイズがhighWaterMarkで設定されたしきい値を下回っている間は、writable.write()への呼び出しはtrueを返します。内部バッファのサイズがhighWaterMarkに達するかそれを超えると、falseが返されます。

stream API、特にstream.pipe()メソッドの重要な目標は、速度の異なるソースと宛先が使用可能なメモリを圧倒しないように、データのバッファリングを許容レベルに制限することです。

highWaterMarkオプションは制限ではなくしきい値です。ストリームがさらにデータの要求を停止する前にバッファするデータの量を指示します。一般的に厳格なメモリ制限を適用するものではありません。特定のストリーム実装はより厳格な制限を適用することを選択できますが、それはオプションです。

DuplexTransformストリームはどちらもReadableWritableであるため、それぞれ読み取りと書き込みに使用される2 つの個別の内部バッファを保持し、各側が適切で効率的なデータの流れを維持しながら、互いに独立して動作できます。たとえば、net.SocketインスタンスはDuplexストリームであり、そのReadable側はソケットから受信したデータの消費を許可し、Writable側はソケットへのデータの書き込みを許可します。データはソケットにデータが受信される速度よりも速くまたは遅く書き込まれる可能性があるため、各側は互いに独立して動作(およびバッファ)する必要があります。

内部バッファリングのメカニズムは内部実装の詳細であり、いつでも変更される可能性があります。ただし、特定の高度な実装では、writable.writableBufferまたはreadable.readableBufferを使用して内部バッファを取得できます。これらの文書化されていないプロパティの使用は推奨されません。

ストリームコンシューマー用 API

どんなに単純な Node.js アプリケーションでも、ほとんどの場合何らかの形でストリームを使用します。Node.js アプリケーションで HTTP サーバーを実装する際のストリームの使用例を以下に示します。

js
const http = require('node:http')

const server = http.createServer((req, res) => {
  // `req` は http.IncomingMessage であり、読み込み可能なストリームです。
  // `res` は http.ServerResponse であり、書き込み可能なストリームです。

  let body = ''
  // データを utf8 文字列として取得します。
  // エンコーディングが設定されていない場合、Bufferオブジェクトが受信されます。
  req.setEncoding('utf8')

  // 読み込み可能なストリームは、リスナーが追加されると 'data' イベントを発生させます。
  req.on('data', chunk => {
    body += chunk
  })

  // 'end' イベントは、本体全体が受信されたことを示します。
  req.on('end', () => {
    try {
      const data = JSON.parse(body)
      // ユーザーに何か興味深いものを書き戻します。
      res.write(typeof data)
      res.end()
    } catch (er) {
      // うわあ!不正なJSON!
      res.statusCode = 400
      return res.end(`error: ${er.message}`)
    }
  })
})

server.listen(1337)

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON

Writable ストリーム(例では res)は、ストリームにデータ書き込むために使用される write()end() などのメソッドを公開しています。

Readable ストリームは、EventEmitter API を使用して、ストリームから読み取ることができるデータが利用可能になったときにアプリケーションコードに通知します。利用可能なデータは、複数の方法でストリームから読み取ることができます。

Writable ストリームと Readable ストリームの両方とも、EventEmitter API をさまざまな方法で使用して、ストリームの現在の状態を伝達します。

Duplex ストリームと Transform ストリームは、両方とも Writable ストリームと Readable ストリームです。

ストリームにデータの書き込みまたはストリームからのデータの消費を行うアプリケーションは、ストリームインターフェースを直接実装する必要はなく、一般的に require('node:stream') を呼び出す理由はありません。

新しいタイプのストリームを実装したい開発者は、ストリーム実装者向け API のセクションを参照してください。

書き込みストリーム

書き込みストリームは、データが書き込まれる宛先の抽象化です。

Writable ストリームの例には、以下が含まれます。

これらの例の一部は、実際にはDuplex ストリームであり、Writable インターフェースを実装しています。

すべてのWritable ストリームは、stream.Writable クラスによって定義されたインターフェースを実装します。

Writable ストリームの具体的なインスタンスはさまざまな点で異なる場合がありますが、すべてのWritableストリームは、以下の例で示されているのと同じ基本的な使用方法のパターンに従います。

js
const myStream = getWritableStreamSomehow()
myStream.write('some data')
myStream.write('some more data')
myStream.end('done writing data')

クラス: stream.Writable

追加されたバージョン: v0.9.4

イベント: 'close'

[履歴]

バージョン変更
v10.0.0emitClose オプションを追加して、'close' が破棄時に発行されるかどうかを指定します。
v0.9.4追加されたバージョン: v0.9.4

'close' イベントは、ストリームとその基礎となるリソース(ファイル記述子など)が閉じられたときに発生します。このイベントは、これ以上のイベントが発行されず、それ以上の計算が行われないことを示します。

Writable ストリームは、emitClose オプションを指定して作成された場合は常に'close'イベントを発行します。

イベント: 'drain'

追加されたバージョン: v0.9.4

stream.write(chunk)への呼び出しがfalseを返す場合、ストリームへのデータの書き込みを再開するのに適したタイミングで'drain'イベントが発生します。

js
// 指定された書き込みストリームに100万回データを書きます。
// バックプレッシャーに注意してください。
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000
  write()
  function write() {
    let ok = true
    do {
      i--
      if (i === 0) {
        // 最後の回!
        writer.write(data, encoding, callback)
      } else {
        // 継続すべきか、待つかを確認します。
        // まだ完了していないため、コールバックは渡しません。
        ok = writer.write(data, encoding)
      }
    } while (i > 0 && ok)
    if (i > 0) {
      // 途中で停止しなければなりませんでした!
      // ドレインされたらさらに書き込みます。
      writer.once('drain', write)
    }
  }
}
イベント: 'error'

追加バージョン: v0.9.4

'error' イベントは、データの書き込みまたはパイプ処理中にエラーが発生した場合に発行されます。リスナーコールバックは呼び出されるときに単一の Error 引数を渡されます。

'error' イベントが発行されると、ストリームは閉じられます。ただし、ストリームの作成時に autoDestroy オプションが false に設定されている場合は例外です。

'error' 以降は、'close' 以外のイベント('error' イベントを含む)は発行されません はずです

イベント: 'finish'

追加バージョン: v0.9.4

'finish' イベントは、stream.end() メソッドが呼び出され、すべてのデータが基になるシステムにフラッシュされた後に発行されます。

js
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`)
}
writer.on('finish', () => {
  console.log('All writes are now complete.')
})
writer.end('This is the end\n')
イベント: 'pipe'

追加バージョン: v0.9.4

  • src <stream.Readable> この書き込み可能ストリームにパイプしているソースストリーム

'pipe' イベントは、読み込み可能ストリームで stream.pipe() メソッドが呼び出され、この書き込み可能ストリームがその宛先のセットに追加されたときに発行されます。

js
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
  console.log('Something is piping into the writer.')
  assert.equal(src, reader)
})
reader.pipe(writer)
イベント: 'unpipe'

追加バージョン: v0.9.4

'unpipe' イベントは、Readable ストリームで stream.unpipe() メソッドが呼び出され、この Writable がその宛先のセットから削除されたときに発行されます。

これは、Readable ストリームがこの Writable ストリームにパイプされたときにエラーが発生した場合にも発行されます。

js
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
  console.log('Something has stopped piping into the writer.')
  assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()

追加されたバージョン: v0.11.2

writable.cork()メソッドは、書き込まれたすべてのデータをメモリにバッファリングします。バッファされたデータは、stream.uncork()またはstream.end()メソッドが呼び出されたときにフラッシュされます。

writable.cork()の主な目的は、複数の小さなチャンクが短時間に連続してストリームに書き込まれる状況に対応することです。writable.cork()は、それらを基になる宛先にすぐに転送する代わりに、writable.uncork()が呼び出されるまですべてのチャンクをバッファリングし、writable._writev()が存在する場合は、それらをすべてwritable._writev()に渡します。これにより、最初の小さなチャンクの処理を待っている間にデータがバッファリングされるヘッドオブラインブロッキングが発生するのを防ぎます。ただし、writable._writev()を実装せずにwritable.cork()を使用すると、スループットに悪影響を与える可能性があります。

こちらも参照してください:writable.uncork()writable._writev()

writable.destroy([error])

[履歴]

バージョン変更点
v14.0.0すでに破棄されているストリームでは何もしない動作になります。
v8.0.0追加されたバージョン: v8.0.0
  • error <Error> オプション。'error'イベントで発生させるエラー。
  • 戻り値: <this>

ストリームを破棄します。オプションで'error'イベントを発生させ、emitClosefalseに設定されていない限り'close'イベントを発生させます。この呼び出し後、書き込みストリームは終了し、その後のwrite()またはend()への呼び出しはERR_STREAM_DESTROYEDエラーになります。これは、ストリームを破壊する破壊的で即時的な方法です。以前のwrite()への呼び出しはフラッシュされていない可能性があり、ERR_STREAM_DESTROYEDエラーをトリガーする可能性があります。データが閉じる前にフラッシュする必要がある場合、またはストリームを破棄する前に'drain'イベントを待つ場合は、destroyの代わりにend()を使用してください。

js
const { Writable } = require('node:stream')

const myStream = new Writable()

const fooErr = new Error('foo error')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo error
js
const { Writable } = require('node:stream')

const myStream = new Writable()

myStream.destroy()
myStream.on('error', function wontHappen() {})
js
const { Writable } = require('node:stream')

const myStream = new Writable()
myStream.destroy()

myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED

destroy()が呼び出されると、それ以降の呼び出しは何もしなくなり、_destroy()からのエラーを除いて、'error'としてさらにエラーは発生しません。

実装者はこのメソッドをオーバーライドするのではなく、writable._destroy()を実装する必要があります。

writable.closed

追加日時: v18.0.0

'close'が emit された後、trueになります。

writable.destroyed

追加日時: v8.0.0

writable.destroy()が呼び出された後、trueになります。

js
const { Writable } = require('node:stream')

const myStream = new Writable()

console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])

[履歴]

バージョン変更内容
v22.0.0, v20.13.0chunk引数にTypedArrayまたはDataViewインスタンスを使用できるようになりました。
v15.0.0callbackは'finish'の前に、またはエラー時に呼び出されるようになりました。
v14.0.0callbackは'finish'または'error'が emit された場合に呼び出されるようになりました。
v10.0.0このメソッドはwritableへの参照を返すようになりました。
v8.0.0chunk引数にUint8Arrayインスタンスを使用できるようになりました。
v0.9.4追加日時: v0.9.4
  • chunk <string> | <Buffer> | <TypedArray> | <DataView> | <any> 書き込むオプションデータ。オブジェクトモードでないストリームの場合、chunk<string><Buffer><TypedArray>、または<DataView>でなければなりません。オブジェクトモードのストリームの場合、chunknull以外の任意の JavaScript 値にすることができます。
  • encoding <string> chunkが文字列の場合のエンコーディング
  • callback <Function> ストリームが終了したときのコールバック。
  • 戻り値: <this>

writable.end()メソッドを呼び出すと、Writableにデータが書き込まれなくなることを示します。オプションのchunkencoding引数を使用すると、ストリームを閉じる直前に、最終的な追加データチャンクを書き込むことができます。

stream.end()を呼び出した後にstream.write()メソッドを呼び出すと、エラーが発生します。

js
// 'hello, 'を書き込み、その後'world!'で終了します。
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hello, ')
file.end('world!')
// 今後の書き込みは許可されません!
writable.setDefaultEncoding(encoding)

[履歴]

バージョン変更
v6.1.0このメソッドは、writableへの参照を返すようになりました。
v0.11.15追加: v0.11.15
  • encoding <string> 新しいデフォルトエンコーディング
  • 戻り値: <this>

writable.setDefaultEncoding()メソッドは、Writable ストリームのデフォルトのencodingを設定します。

writable.uncork()

追加: v0.11.2

writable.uncork()メソッドは、stream.cork()が呼び出されてからバッファリングされたすべてのデータをフラッシュします。

writable.cork()writable.uncork()を使用してストリームへの書き込みのバッファリングを管理する場合は、process.nextTick()を使用してwritable.uncork()への呼び出しを遅延させます。これにより、特定の Node.js イベントループフェーズ内で発生するすべてのwritable.write()呼び出しのバッチ処理が可能になります。

js
stream.cork()
stream.write('some ')
stream.write('data ')
process.nextTick(() => stream.uncork())

writable.cork()メソッドがストリームに対して複数回呼び出された場合、バッファリングされたデータをフラッシュするには、同じ数のwritable.uncork()呼び出しを行う必要があります。

js
stream.cork()
stream.write('some ')
stream.cork()
stream.write('data ')
process.nextTick(() => {
  stream.uncork()
  // データは、uncork()が2回目に呼び出されるまでフラッシュされません。
  stream.uncork()
})

こちらも参照: writable.cork()

writable.writable

追加: v11.4.0

ストリームが破棄、エラー、または終了していないことを意味し、writable.write()を呼び出すことが安全な場合はtrueです。

writable.writableAborted

追加: v18.0.0, v16.17.0

[安定版: 1 - 実験的]

安定版: 1 安定性: 1 - 実験的

'finish'の発生前にストリームが破棄またはエラーになったかどうかを返します。

writable.writableEnded

追加時期: v12.9.0

writable.end() が呼び出された後、true になります。このプロパティはデータがフラッシュされたかどうかを示すものではなく、そのためには代わりに writable.writableFinished を使用してください。

writable.writableCorked

追加時期: v13.2.0, v12.16.0

ストリームを完全にアンコルクするためにwritable.uncork() を呼び出す必要がある回数。

writable.errored

追加時期: v18.0.0

ストリームがエラーで破棄された場合にエラーを返します。

writable.writableFinished

追加時期: v12.6.0

'finish' イベントが emit される直前に true に設定されます。

writable.writableHighWaterMark

追加時期: v9.3.0

この Writable を作成時に渡された highWaterMark の値を返します。

writable.writableLength

追加時期: v9.4.0

このプロパティには、書き込み準備ができたキュー内のバイト数(またはオブジェクト数)が含まれています。この値は、highWaterMark の状態に関する自己検査データを提供します。

writable.writableNeedDrain

追加時期: v15.2.0, v14.17.0

ストリームのバッファがいっぱいになり、ストリームが 'drain' を emit する場合、true になります。

writable.writableObjectMode

追加日時: v12.3.0

指定されたWritableストリームのobjectModeプロパティのゲッター。

writable[Symbol.asyncDispose]()

追加日時: v22.4.0, v20.16.0

[安定版: 1 - 試験的]

安定版: 1 安定性: 1 - 試験的

AbortError を用いて writable.destroy() を呼び出し、ストリームが終了した時に解決される Promise を返します。

writable.write(chunk[, encoding][, callback])

[履歴]

バージョン変更
v22.0.0, v20.13.0chunk引数にTypedArrayまたはDataViewインスタンスを使用できるようになりました。
v8.0.0chunk引数にUint8Arrayインスタンスを使用できるようになりました。
v6.0.0オブジェクトモードでもchunkパラメータにnullを渡すと常に無効と見なされるようになりました。
v0.9.4追加日時: v0.9.4
  • chunk <string> | <Buffer> | <TypedArray> | <DataView> | <any> 書き込むオプションデータ。オブジェクトモードで動作していないストリームの場合、chunk <string> <Buffer> <TypedArray>、または <DataView> でなければなりません。オブジェクトモードストリームの場合、chunknull以外の任意の JavaScript 値にすることができます。
  • encoding <string> | <null> chunkが文字列の場合のエンコーディング。 デフォルト: 'utf8'
  • callback <Function> このチャンクのデータがフラッシュされたときのコールバック。
  • 戻り値: <boolean> ストリームが呼び出し元のコードに追加のデータの書き込みを続ける前に'drain'イベントの発生を待つことを希望する場合false。それ以外の場合はtrue

writable.write()メソッドはストリームにデータ書き込み、データが完全に処理された後、指定されたcallbackを呼び出します。エラーが発生した場合、callbackはエラーを最初の引数として呼び出されます。callbackは非同期的に、そして'error'が emit される前に呼び出されます。

戻り値は、chunkを受け入れた後、ストリーム作成時に設定されたhighWaterMarkよりも内部バッファが小さい場合にtrueです。falseが返された場合、'drain'イベントが emit されるまで、ストリームへのデータの書き込みのさらなる試行を停止する必要があります。

ストリームが排水されていない間、write()への呼び出しはchunkをバッファリングし、false を返します。現在バッファリングされているすべてのチャンクが排水されると(オペレーティングシステムによって配信のために受け入れられると)、'drain'イベントが emit されます。write()が false を返すようになったら、'drain'イベントが emit されるまで、さらにチャンクを書き込まないでください。排水されていないストリームに対してwrite()を呼び出すことは許可されていますが、Node.js は最大メモリ使用量が発生するまで、書き込まれたすべてのチャンクをバッファリングし、その時点で無条件に中止します。中止する前でも、メモリ使用量が多いと、ガベージコレクタのパフォーマンスが悪くなり、RSS が高くなります(メモリが不要になった後でも、通常はシステムに返却されません)。TCP ソケットは、リモートピアがデータを読み取らない場合、排水されない可能性があるため、排水されていないソケットに書き込むと、リモートで悪用可能な脆弱性につながる可能性があります。

ストリームが排水されていない間にデータの書き込みを行うことは、Transformの場合に特に問題があります。なぜなら、Transformストリームは、パイプされるか、'data'または'readable'イベントハンドラが追加されるまで、デフォルトで一時停止されるからです。

書き込むデータをオンデマンドで生成または取得できる場合は、ロジックをReadableにカプセル化し、stream.pipe()を使用することをお勧めします。ただし、write()の呼び出しが優先される場合は、'drain'イベントを使用してバックプレッシャーを尊重し、メモリの問題を回避することができます。

js
function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb)
  } else {
    process.nextTick(cb)
  }
}

// cbが呼び出されるのを待つまで、他の書き込みは行わない。
write('hello', () => {
  console.log('書き込みが完了しました。さらに書き込みを実行します。')
})

オブジェクトモードのWritableストリームは、常にencoding引数を無視します。

読み込み可能ストリーム

読み込み可能ストリームは、データが消費される ソース の抽象化です。

Readable ストリームの例:

すべてのReadableストリームは、stream.Readableクラスによって定義されたインターフェースを実装します。

2 つの読み込みモード

Readableストリームは、事実上、フローモードと一時停止モードの 2 つのモードのいずれかで動作します。これらのモードは、オブジェクトモードとは別です。Readableストリームは、フローモードか一時停止モードかに関係なく、オブジェクトモードであるかどうかにかかわらず動作します。

  • フローモードでは、データは基となるシステムから自動的に読み取られ、EventEmitterインターフェースを介したイベントを使用して、できるだけ早くアプリケーションに提供されます。
  • 一時停止モードでは、stream.read()メソッドを明示的に呼び出して、ストリームからデータのチャンクを読み取る必要があります。

すべてのReadableストリームは一時停止モードで開始されますが、次のいずれかの方法でフローモードに切り替えることができます。

Readableは、次のいずれかの方法を使用して一時停止モードに戻すことができます。

  • パイプの宛先がない場合は、stream.pause()メソッドを呼び出す。
  • パイプの宛先がある場合は、すべてのパイプの宛先を削除する。複数のパイプの宛先は、stream.unpipe()メソッドを呼び出すことで削除できます。

重要な点は、データの消費または無視のメカニズムが提供されるまで、Readableはデータ生成を行わないということです。消費メカニズムが無効化または削除されると、Readableはデータの生成を 停止しようとします

下位互換性のため、'data'イベントハンドラーを削除しても、ストリームは自動的に一時停止されません。また、パイプされた宛先がある場合、stream.pause()を呼び出しても、それらの宛先がドレインしてさらにデータを求めた後にストリームが一時停止されたままになる保証はありません。

Readableがフローモードに切り替えられ、データ処理可能なコンシューマーがない場合、そのデータは失われます。これは、たとえば、'data'イベントにリスナーを接続せずにreadable.resume()メソッドが呼び出された場合、または'data'イベントハンドラーがストリームから削除された場合に発生する可能性があります。

'readable'イベントハンドラーを追加すると、ストリームは自動的にフローを停止し、データはreadable.read()を使用して消費する必要があります。'readable'イベントハンドラーが削除されると、'data'イベントハンドラーがある場合、ストリームは再びフローを開始します。

3 つの状態

Readable ストリームの「2 つのモード」の動作は、Readable ストリームの実装内で発生しているより複雑な内部状態管理に対する簡略化された抽象化です。

具体的には、任意の時点で、すべてのReadable は次の 3 つの可能な状態のいずれかになります。

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

readable.readableFlowingnull の場合、ストリームのデータを使用するためのメカニズムは提供されません。したがって、ストリームはデータを作成しません。この状態では、'data' イベントのリスナーをアタッチしたり、readable.pipe() メソッドを呼び出したり、readable.resume() メソッドを呼び出したりすると、readable.readableFlowingtrue に切り替わり、Readable はデータが生成されると積極的にイベントを発行し始めます。

readable.pause()readable.unpipe() を呼び出すか、バックプレッシャーを受け取ると、readable.readableFlowingfalse に設定され、イベントのフローは一時的に停止しますが、データの生成は停止しません。この状態では、'data' イベントのリスナーをアタッチしても、readable.readableFlowingtrue に切り替わりません。

js
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()

pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing is now false.

pass.on('data', chunk => {
  console.log(chunk.toString())
})
// readableFlowing is still false.
pass.write('ok') // Will not emit 'data'.
pass.resume() // Must be called to make stream emit 'data'.
// readableFlowing is now true.

readable.readableFlowingfalse の間、データはストリームの内部バッファに蓄積されている可能性があります。

API スタイルの選択

Readable ストリーム API は、複数の Node.js バージョンで進化しており、ストリームデータを使用するための複数のメソッドを提供しています。一般的に、開発者はデータを使用するメソッドを1 つ選択する必要があり、単一のストリームからデータを使用するために複数のメソッドを使用するべきではありません。具体的には、on('data')on('readable')pipe()、または非同期イテレータを組み合わせて使用すると、直感に反する動作につながる可能性があります。

クラス: stream.Readable

追加されたバージョン: v0.9.4

イベント: 'close'

[履歴]

バージョン変更
v10.0.0emitClose オプションを追加して、'close' イベントが破棄時に emit されるかどうかを指定できるようにしました。
v0.9.4v0.9.4 で追加

'close' イベントは、ストリームとその基礎となるリソース(ファイル記述子など)が閉じられたときに発生します。このイベントは、それ以上イベントが発生せず、それ以上の計算が行われないことを示します。

Readable ストリームは、emitClose オプションを指定して作成された場合、常に 'close' イベントを発生させます。

イベント: 'data'

追加されたバージョン: v0.9.4

  • chunk <Buffer> | <string> | <any> データのチャンク。オブジェクトモードで動作していないストリームの場合、チャンクは文字列または Buffer になります。オブジェクトモードのストリームの場合、チャンクは null 以外の任意の JavaScript 値にすることができます。

'data' イベントは、ストリームがデータのチャンクの所有権をコンシューマーに譲渡するたびに発生します。これは、readable.pipe()readable.resume() を呼び出すことによって、または 'data' イベントにリスナーコールバックをアタッチすることによって、ストリームがフローイングモードに切り替えられるたびに発生する可能性があります。'data' イベントは、readable.read() メソッドが呼び出され、返されるデータのチャンクが利用可能な場合にも発生します。

明示的に一時停止されていないストリームに 'data' イベントリスナーをアタッチすると、ストリームはフローイングモードに切り替わります。データは利用可能になり次第渡されます。

リスナーコールバックには、readable.setEncoding() メソッドを使用してストリームにデフォルトのエンコーディングが指定されている場合は文字列として、そうでない場合は Buffer として、データのチャンクが渡されます。

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Received ${chunk.length} bytes of data.`)
})
イベント: 'end'

追加バージョン: v0.9.4

'end' イベントは、ストリームから消費できるデータがなくなった時に発生します。

'end' イベントは、データが完全に消費されない限り発生しません。これは、ストリームをフローイングモードに切り替えるか、stream.read() をデータがすべて消費されるまで繰り返し呼び出すことで実現できます。

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Received ${chunk.length} bytes of data.`)
})
readable.on('end', () => {
  console.log('There will be no more data.')
})
イベント: 'error'

追加バージョン: v0.9.4

'error' イベントは、Readable 実装によっていつでも発生する可能性があります。通常、これは基盤となるストリームが内部エラーのためにデータ生成できない場合、またはストリーム実装が無効なデータチャンクをプッシュしようとした場合に発生する可能性があります。

リスナーコールバックには、単一の Error オブジェクトが渡されます。

イベント: 'pause'

追加バージョン: v0.9.4

'pause' イベントは、stream.pause() が呼び出され、readableFlowingfalse でない場合に発生します。

イベント: 'readable'

[履歴]

バージョン変更
v10.0.0'readable'.push() が呼び出された後の次のティックで常に発生します。
v10.0.0'readable' を使用するには .read() を呼び出す必要があります。
v0.9.4追加バージョン: v0.9.4

'readable' イベントは、ストリームから読み取ることができるデータが、設定されたハイウォーターマーク(state.highWaterMark)まで利用可能になったときに発生します。事実上、ストリームがバッファ内に新しい情報を持っていることを示します。このバッファ内にデータがあれば、stream.read() を呼び出してそのデータを取得できます。さらに、'readable' イベントは、ストリームの終端に達した場合にも発生する可能性があります。

js
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
  // There is some data to read now.
  let data

  while ((data = this.read()) !== null) {
    console.log(data)
  }
})

ストリームの終端に達した場合、stream.read() を呼び出すと null が返され、'end' イベントが発生します。読み取るデータがまったくなかった場合も同様です。例えば、次の例では、foo.txt は空ファイルです。

js
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
  console.log('end')
})

このスクリプトを実行した出力は次のとおりです。

bash
$ node test.js
readable: null
end

場合によっては、'readable' イベントのリスナーをアタッチすると、内部バッファに一定量のデータが読み込まれます。

一般的に、readable.pipe()'data' イベントメカニズムは 'readable' イベントよりも理解しやすいです。しかし、'readable' を処理するとスループットが向上する可能性があります。

'readable''data' の両方が同時に使用されている場合、'readable' がフロー制御で優先されます。つまり、stream.read() が呼び出された場合にのみ 'data' が発生します。readableFlowing プロパティは false になります。'readable' が削除されたときに 'data' リスナーが存在する場合、ストリームはフローイングを開始します。つまり、.resume() を呼び出さずに 'data' イベントが発生します。

イベント: 'resume'

追加されたバージョン: v0.9.4

'resume' イベントは、stream.resume() が呼び出され、readableFlowingtrue でない場合に発生します。

readable.destroy([error])

[履歴]

バージョン変更点
v14.0.0すでに破棄されているストリームに対しては何もしない動作になります。
v8.0.0追加されたバージョン: v8.0.0
  • error <Error> 'error' イベントのペイロードとして渡されるエラー
  • 戻り値: <this>

ストリームを破棄します。オプションで 'error' イベントと 'close' イベントを発生させます(emitClosefalse に設定されている場合を除きます)。この呼び出しの後、読み込みストリームは内部リソースを解放し、その後の push() への呼び出しは無視されます。

destroy() が呼び出された後は、それ以降の呼び出しは何もしない動作になり、_destroy() からのエラーを除いて、'error' としてさらにエラーが発生することはありません。

実装者はこのメソッドをオーバーライドするのではなく、readable._destroy() を実装する必要があります。

readable.closed

追加されたバージョン: v18.0.0

'close' が発生した後は true になります。

readable.destroyed

追加されたバージョン: v8.0.0

readable.destroy() が呼び出された後は true になります。

readable.isPaused()

追加されたバージョン: v0.11.14

readable.isPaused() メソッドは、Readable の現在の動作状態を返します。これは主に、readable.pipe() メソッドの基礎となるメカニズムによって使用されます。ほとんどの典型的なケースでは、このメソッドを直接使用する理由はほとんどありません。

js
const readable = new stream.Readable()

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()

追加: v0.9.4

readable.pause()メソッドは、フローモードにあるストリームにおいて'data'イベントの発行を停止し、フローモードから切り替えます。利用可能になるデータは、内部バッファに残ります。

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Received ${chunk.length} bytes of data.`)
  readable.pause()
  console.log('There will be no additional data for 1 second.')
  setTimeout(() => {
    console.log('Now data will start flowing again.')
    readable.resume()
  }, 1000)
})

'readable'イベントリスナーが存在する場合は、readable.pause()メソッドは効果がありません。

readable.pipe(destination[, options])

追加: v0.9.4

  • destination <stream.Writable> データ書き込み先の宛先

  • options <Object> パイプオプション

    • end <boolean> リーダーが終了したときにライターを終了します。デフォルト: true
  • 戻り値: <stream.Writable> DuplexまたはTransformストリームである場合、パイプの連鎖を可能にする宛先

readable.pipe()メソッドは、Writableストリームをreadableに接続し、自動的にフローモードに切り替えて、そのすべてのデータを接続されたWritableにプッシュします。データのフローは、宛先のWritableストリームがより高速なReadableストリームによって圧倒されないように、自動的に管理されます。

次の例では、readableからのすべてのデータをfile.txtという名前のファイルにパイプします。

js
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// readableからのすべてのデータは'file.txt'に入ります。
readable.pipe(writable)

1 つのReadableストリームに複数のWritableストリームを接続できます。

readable.pipe()メソッドは宛先ストリームへの参照を返し、パイプされたストリームの連鎖を設定することを可能にします。

js
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)

デフォルトでは、ソースReadableストリームが'end'を発行すると、宛先のWritableストリームでstream.end()が呼び出され、宛先は書き込み不可になります。このデフォルトの動作を無効にするには、endオプションをfalseとして渡すと、宛先ストリームは開いたままになります。

js
reader.pipe(writer, { end: false })
reader.on('end', () => {
  writer.end('Goodbye\n')
})

重要な注意点として、処理中にReadableストリームがエラーを発行した場合、Writableの宛先は自動的に閉じられません。エラーが発生した場合は、メモリリークを防ぐために、各ストリームを手動で閉じる必要があります。

指定されたオプションに関係なく、process.stderrprocess.stdoutWritableストリームは、Node.js プロセスが終了するまで決して閉じられません。

readable.read([size])

追加日:v0.9.4

readable.read() メソッドは、内部バッファからデータを読み出して返します。読み取るデータがない場合、null を返します。readable.setEncoding() メソッドを使用してエンコードが指定されている場合、またはストリームがオブジェクトモードで動作している場合を除き、データはデフォルトで Buffer オブジェクトとして返されます。

オプションの size 引数は、読み取るバイト数を指定します。size バイトを読み取ることができない場合、ストリームが終了している場合を除き、null が返されます。ストリームが終了している場合は、内部バッファに残っているすべてのデータが返されます。

size 引数を指定しない場合、内部バッファに含まれるすべてのデータが返されます。

size 引数は、1 GiB 以下である必要があります。

readable.read() メソッドは、一時停止モードで動作している Readable ストリームに対してのみ呼び出す必要があります。フローイングモードでは、内部バッファが完全に空になるまで readable.read() が自動的に呼び出されます。

js
const readable = getReadableStreamSomehow()

// データがバッファにバッファリングされると、'readable' が複数回トリガーされる可能性があります
readable.on('readable', () => {
  let chunk
  console.log('ストリームは読み取り可能です(バッファに新しいデータを受信)')
  // すべて利用可能なデータを読み取るためにループを使用します
  while (null !== (chunk = readable.read())) {
    console.log(`データの ${chunk.length} バイトを読み取りました...`)
  }
})

// 利用可能なデータがなくなると、'end' が1回トリガーされます
readable.on('end', () => {
  console.log('ストリームの終わりに到達しました。')
})

readable.read() を呼び出すたびに、データのチャンクまたは null が返され、その時点で読み取るデータがなくなったことを示します。これらのチャンクは自動的に連結されません。1 回の read() 呼び出しですべてのデータが返されるわけではないため、すべてのデータを取得するまでチャンクを継続的に読み取るには、while ループが必要になる場合があります。大きなファイルを読み取るとき、.read() は一時的に null を返すことがあり、バッファされたすべてのコンテンツが消費されたが、さらにバッファされるデータがある可能性があることを示します。このような場合、バッファにもうデータがあるとその時点で新しい 'readable' イベントが発行され、'end' イベントはデータ送信の終了を示します。

したがって、readable からファイルの全内容を読み取るには、複数の 'readable' イベントにわたってチャンクを収集する必要があります。

js
const chunks = []

readable.on('readable', () => {
  let chunk
  while (null !== (chunk = readable.read())) {
    chunks.push(chunk)
  }
})

readable.on('end', () => {
  const content = chunks.join('')
})

オブジェクトモードの Readable ストリームは、size 引数の値に関係なく、readable.read(size) を呼び出すと常に 1 つのアイテムを返します。

readable.read() メソッドがデータのチャンクを返す場合、'data' イベントも発行されます。

'end' イベントが発行された後に stream.read([size]) を呼び出すと、null が返されます。ランタイムエラーは発生しません。

readable.readable

追加:v11.4.0

readable.read() の呼び出しが安全かどうかを示す true です。これは、ストリームが破棄されていないか、'error' または 'end' を emit していないことを意味します。

readable.readableAborted

追加:v16.8.0

[安定版: 1 - 実験的]

安定版: 1 安定性: 1 - 実験的

'end' を emit する前にストリームが破棄されたか、エラーが発生したかどうかを返します。

readable.readableDidRead

追加:v16.7.0, v14.18.0

[安定版: 1 - 実験的]

安定版: 1 安定性: 1 - 実験的

'data' が emit されたかどうかを返します。

readable.readableEncoding

追加:v12.7.0

指定された Readable ストリームの encoding プロパティのゲッターです。encoding プロパティは、readable.setEncoding() メソッドを使用して設定できます。

readable.readableEnded

追加:v12.9.0

'end' イベントが emit されると true になります。

readable.errored

追加:v18.0.0

ストリームがエラーで破棄された場合にエラーを返します。

readable.readableFlowing

追加:v9.4.0

このプロパティは、3 つの状態 セクションで説明されているように、Readable ストリームの現在の状態を反映します。

readable.readableHighWaterMark

追加日時: v9.3.0

この Readable の作成時に渡された highWaterMark の値を返します。

readable.readableLength

追加日時: v9.4.0

このプロパティは、読み取り準備ができたキュー内のバイト数(またはオブジェクト数)を含みます。この値は、highWaterMark の状態に関するイントロスペクションデータを提供します。

readable.readableObjectMode

追加日時: v12.3.0

指定された Readable ストリームの objectMode プロパティのゲッターです。

readable.resume()

[履歴]

バージョン変更点
v10.0.0'readable' イベントリスナーが存在する場合は、resume() は効果がありません。
v0.9.4追加日時: v0.9.4

readable.resume() メソッドは、明示的に一時停止された Readable ストリームを再開して 'data' イベントの送出を再開し、ストリームをフローイングモードに切り替えます。

readable.resume() メソッドは、データ処理を行わずにストリームからデータを完全に消費するために使用できます。

js
getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Reached the end, but did not read anything.')
  })

'readable' イベントリスナーが存在する場合は、readable.resume() メソッドは効果がありません。

readable.setEncoding(encoding)

追加日時: v0.9.4

  • encoding <string> 使用するエンコーディング。
  • 戻り値: <this>

readable.setEncoding() メソッドは、Readable ストリームから読み取られるデータの文字エンコーディングを設定します。

デフォルトでは、エンコーディングは割り当てられず、ストリームデータは Buffer オブジェクトとして返されます。エンコーディングを設定すると、ストリームデータは Buffer オブジェクトではなく、指定されたエンコーディングの文字列として返されるようになります。たとえば、readable.setEncoding('utf8') を呼び出すと、出力データは UTF-8 データとして解釈され、文字列として渡されます。readable.setEncoding('hex') を呼び出すと、データは 16 進数の文字列形式でエンコードされます。

Readable ストリームは、ストリームから Buffer オブジェクトとして単純に取得した場合に正しくデコードされなくなる可能性のある、ストリームを介して配信されるマルチバイト文字を適切に処理します。

js
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
  assert.equal(typeof chunk, 'string')
  console.log('Got %d characters of string data:', chunk.length)
})
readable.unpipe([destination])

追加日時: v0.9.4

readable.unpipe() メソッドは、stream.pipe() メソッドを使用して以前に接続された Writable ストリームを切り離します。

destination が指定されていない場合、すべてのパイプが切り離されます。

destination が指定されているが、それにパイプが設定されていない場合、メソッドは何もしません。

js
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// readableからのすべてのデータは'file.txt'に入りますが、最初の1秒間のみです。
readable.pipe(writable)
setTimeout(() => {
  console.log('Stop writing to file.txt.')
  readable.unpipe(writable)
  console.log('Manually close the file stream.')
  writable.end()
}, 1000)
readable.unshift(chunk[, encoding])

[履歴]

バージョン変更点
v22.0.0, v20.13.0chunk 引数に TypedArray または DataView インスタンスを使用できるようになりました。
v8.0.0chunk 引数に Uint8Array インスタンスを使用できるようになりました。
v0.9.11追加日時: v0.9.11
  • chunk <Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> 読み込みキューの先頭に挿入するデータのチャンク。オブジェクトモードではないストリームの場合、chunk<string><Buffer><TypedArray><DataView> または null でなければなりません。オブジェクトモードストリームの場合、chunk は任意の JavaScript 値にすることができます。
  • encoding <string> 文字列チャンクのエンコーディング。'utf8''ascii' などの有効な Buffer エンコーディングでなければなりません。

chunknull として渡すと、ストリームの終わり (EOF) を示し、readable.push(null) と同じ動作になり、それ以降はデータを書込むことができなくなります。EOF シグナルはバッファの最後に置かれ、バッファリングされたデータはすべてフラッシュされます。

readable.unshift() メソッドは、データのチャンクを内部バッファにプッシュします。これは、ソースから楽観的に一部のデータを取り出したコードによってストリームが消費されている特定の状況で役立ちます。これにより、データを他の当事者に渡すことができます。

stream.unshift(chunk) メソッドは、'end' イベントが発行された後では呼び出すことができず、ランタイムエラーが発生します。

stream.unshift() を使用している開発者は、代わりに Transform ストリームの使用に切り替えることを検討する必要があります。「ストリーム実装者向けの API」セクションを参照してください。

js
// \n\nで区切られたヘッダーを取り出します。
// 多すぎる場合はunshift()を使用します。
// (error, header, stream)でコールバックを呼び出します。
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
  stream.on('error', callback)
  stream.on('readable', onReadable)
  const decoder = new StringDecoder('utf8')
  let header = ''
  function onReadable() {
    let chunk
    while (null !== (chunk = stream.read())) {
      const str = decoder.write(chunk)
      if (str.includes('\n\n')) {
        // ヘッダー境界が見つかりました。
        const split = str.split(/\n\n/)
        header += split.shift()
        const remaining = split.join('\n\n')
        const buf = Buffer.from(remaining, 'utf8')
        stream.removeListener('error', callback)
        // unshiftする前に'readable'リスナーを削除します。
        stream.removeListener('readable', onReadable)
        if (buf.length) stream.unshift(buf)
        // メッセージの本文はストリームから読み取ることができます。
        callback(null, header, stream)
        return
      }
      // ヘッダーを読み取り中です。
      header += str
    }
  }
}

stream.push(chunk) とは異なり、stream.unshift(chunk) はストリームの内部読み込み状態をリセットすることによって読み込みプロセスを終了しません。これは、読み込み中に(つまり、カスタムストリームの stream._read() 実装内から)readable.unshift() が呼び出された場合に予期しない結果を引き起こす可能性があります。readable.unshift() の呼び出し後にすぐに stream.push('') を実行すると、読み込み状態が適切にリセットされますが、読み込みの実行中に readable.unshift() を呼び出さない方が最善です。

readable.wrap(stream)

追加されたバージョン: v0.9.4

  • stream <Stream> "旧スタイル"の読み込みストリーム
  • 戻り値: <this>

Node.js 0.10 より前では、ストリームは現在のnode:streamモジュール API 全体を実装していませんでした。(詳細は互換性を参照してください。)

'data'イベントを発行し、アドバイザリのみのstream.pause()メソッドを持つ古い Node.js ライブラリを使用する場合、readable.wrap()メソッドを使用して、古いストリームをデータソースとして使用するReadableストリームを作成できます。

readable.wrap()を使用する必要はほとんどありませんが、古い Node.js アプリケーションやライブラリとの対話のための便宜として提供されています。

js
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)

myReader.on('readable', () => {
  myReader.read() // etc.
})
readable[Symbol.asyncIterator]()

[履歴]

バージョン変更
v11.14.0Symbol.asyncIterator のサポートは実験的ではなくなりました。
v10.0.0追加されたバージョン: v10.0.0
js
const fs = require('node:fs')

async function print(readable) {
  readable.setEncoding('utf8')
  let data = ''
  for await (const chunk of readable) {
    data += chunk
  }
  console.log(data)
}

print(fs.createReadStream('file')).catch(console.error)

ループがbreakreturn、またはthrowで終了すると、ストリームは破棄されます。言い換えれば、ストリームを反復処理すると、ストリームは完全に消費されます。ストリームはhighWaterMarkオプションに等しいサイズのチャンクで読み取られます。上記のコード例では、highWaterMarkオプションがfs.createReadStream()に提供されていないため、ファイルのデータが 64 KiB 未満の場合は、データは単一のチャンクになります。

readable[Symbol.asyncDispose]()

追加日時: v20.4.0, v18.18.0

[安定版: 1 - 実験的]

安定版: 1 安定性: 1 - 実験的

AbortError を使用して readable.destroy() を呼び出し、ストリームが終了したときに解決する Promise を返します。

readable.compose(stream[, options])

追加日時: v19.1.0, v18.13.0

[安定版: 1 - 実験的]

安定版: 1 安定性: 1 - 実験的

js
import { Readable } from 'node:stream'

async function* splitToWords(source) {
  for await (const chunk of source) {
    const words = String(chunk).split(' ')

    for (const word of words) {
      yield word
    }
  }
}

const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords)
const words = await wordsStream.toArray()

console.log(words) // ['this', 'is', 'compose', 'as', 'operator'] を出力します

詳細は stream.compose を参照してください。

readable.iterator([options])

追加日時: v16.3.0

[安定版: 1 - 実験的]

安定版: 1 安定性: 1 - 実験的

  • options <Object>

    • destroyOnReturn <boolean> false に設定されている場合、非同期イテレーターで return を呼び出したり、breakreturnthrow を使用して for await...of 反復処理を終了しても、ストリームは破棄されません。デフォルト: true
  • 戻り値: ストリームを使用するための <AsyncIterator>

このメソッドによって作成されたイテレーターは、for await...of ループが returnbreak、または throw によって終了した場合、またはストリームが反復中にエラーを発生させた場合に、ストリームの破棄をキャンセルするオプションをユーザーに提供します。

js
const { Readable } = require('node:stream')

async function printIterator(readable) {
  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk) // 1
    break
  }

  console.log(readable.destroyed) // false

  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk) // 2 と 3 を出力します
  }

  console.log(readable.destroyed) // true、ストリームは完全に消費されました
}

async function printSymbolAsyncIterator(readable) {
  for await (const chunk of readable) {
    console.log(chunk) // 1
    break
  }

  console.log(readable.destroyed) // true
}

async function showBoth() {
  await printIterator(Readable.from([1, 2, 3]))
  await printSymbolAsyncIterator(Readable.from([1, 2, 3]))
}

showBoth()
readable.map(fn[, options])

[履歴]

バージョン変更
v20.7.0, v18.19.0optionshighWaterMarkを追加
v17.4.0, v16.14.0追加: v17.4.0, v16.14.0

[安定版: 1 - 試験段階]

安定版: 1 安定性: 1 - 試験段階

  • fn <Function> | <AsyncFunction> ストリーム内の各チャンクに対してマップする関数。

    • data <any> ストリームからのデータチャンク。
    • options <Object>
    • signal <AbortSignal> ストリームが破棄された場合に中断され、fn呼び出しを早期に中断できます。
  • options <Object>

    • concurrency <number> ストリームに対して一度に呼び出すfnの最大同時呼び出し数。デフォルト: 1
    • highWaterMark <number> マップされたアイテムのユーザー消費を待っている間にバッファするアイテム数。デフォルト: concurrency * 2 - 1
    • signal <AbortSignal> シグナルが中断された場合にストリームを破棄できます。
  • 戻り値: <Readable> 関数fnでマップされたストリーム。

このメソッドを使用すると、ストリームをマップできます。fn関数は、ストリーム内の各チャンクに対して呼び出されます。fn関数が Promise を返す場合、その Promise はawaitされた後に結果ストリームに渡されます。

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// 同期マッパーを使用
for await (const chunk of Readable.from([1, 2, 3, 4]).map(x => x * 2)) {
  console.log(chunk) // 2, 4, 6, 8
}
// 非同期マッパーを使用し、最大2つのクエリを同時に実行
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
  domain => resolver.resolve4(domain),
  { concurrency: 2 }
)
for await (const result of dnsResults) {
  console.log(result) // resolver.resolve4のDNS結果をログ出力します。
}
readable.filter(fn[, options])

[履歴]

バージョン変更点
v20.7.0, v18.19.0optionshighWaterMark を追加
v17.4.0, v16.14.0追加: v17.4.0, v16.14.0

[安定性: 1 - 実験的]

安定性: 1 安定性: 1 - 実験的

  • fn <Function> | <AsyncFunction> ストリームからチャンクをフィルタリングする関数。

    • data <any> ストリームからのデータチャンク。
    • options <Object>
    • signal <AbortSignal> ストリームが破棄された場合に中止され、fn の呼び出しを早期に中止できます。
  • options <Object>

    • concurrency <number> ストリームで同時に呼び出される fn の最大同時実行数。デフォルト: 1
    • highWaterMark <number> フィルタリングされたアイテムのユーザーによる消費を待っている間にバッファするアイテムの数。デフォルト: concurrency * 2 - 1
    • signal <AbortSignal> シグナルが中止された場合にストリームの破棄を許可します。
  • 戻り値: <Readable> 述語 fn でフィルタリングされたストリーム。

このメソッドを使用すると、ストリームをフィルタリングできます。ストリーム内の各チャンクに対して fn 関数が呼び出され、真の値を返す場合、そのチャンクは結果ストリームに渡されます。fn 関数が Promise を返す場合、その Promise は await されます。

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// 同期述語を使用する場合。
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
  console.log(chunk) // 3, 4
}
// 非同期述語を使用し、最大 2 つのクエリを一度に行う場合。
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).filter(
  async domain => {
    const { address } = await resolver.resolve4(domain, { ttl: true })
    return address.ttl > 60
  },
  { concurrency: 2 }
)
for await (const result of dnsResults) {
  // 解決された DNS レコードで 60 秒を超えるドメインをログ出力します。
  console.log(result)
}
readable.forEach(fn[, options])

追加: v17.5.0, v16.15.0

[安定版: 1 - 試験段階]

安定版: 1 安定性: 1 - 試験段階

  • fn <Function> | <AsyncFunction> ストリームの各チャンクで呼び出す関数。

    • data <any> ストリームからのデータチャンク。
    • options <Object>
    • signal <AbortSignal> ストリームが破棄された場合に中断され、fn の呼び出しを早期に中断できます。
  • options <Object>

    • concurrency <number> ストリームで一度に呼び出す fn の最大同時呼び出し数。デフォルト: 1
    • signal <AbortSignal> シグナルが中断された場合にストリームを破棄できます。
  • 戻り値: <Promise> ストリームが終了した時点の Promise。

このメソッドは、ストリームの反復処理を可能にします。ストリーム内の各チャンクに対して、fn関数が呼び出されます。fn関数が Promise を返す場合、その Promise はawaitされます。

このメソッドはfor await...ofループとは異なり、オプションでチャンクを同時に処理できます。さらに、forEach反復はsignalオプションを渡して関連するAbortControllerを中断することでのみ停止できますが、for await...ofbreakまたはreturnで停止できます。いずれの場合も、ストリームは破棄されます。

このメソッドは'data'イベントをリスニングするとは異なり、基盤となる仕組みでreadableイベントを使用し、同時fn呼び出しの数を制限できます。

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// 同期プリディケートを使用。
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
  console.log(chunk) // 3, 4
}
// 非同期プリディケートを使用し、最大2つのクエリを同時に実行します。
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
  async domain => {
    const { address } = await resolver.resolve4(domain, { ttl: true })
    return address
  },
  { concurrency: 2 }
)
await dnsResults.forEach(result => {
  // `for await (const result of dnsResults)` と同様に結果を出力します。
  console.log(result)
})
console.log('done') // ストリームが終了しました
readable.toArray([options])

追加: v17.5.0, v16.15.0

[安定版: 1 - 試験的]

安定版: 1 安定性: 1 - 試験的

  • options <Object>

    • signal <AbortSignal> シグナルが中断された場合、toArray 操作をキャンセルできます。
  • 戻り値: <Promise> ストリームの内容を含む配列を含む Promise。

このメソッドを使用すると、ストリームの内容を簡単に取得できます。

このメソッドはストリーム全体をメモリに読み込むため、ストリームの利点を無効にします。ストリームを使用する主要な方法としてではなく、相互運用性と利便性を目的としています。

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

await Readable.from([1, 2, 3, 4]).toArray() // [1, 2, 3, 4]

// .mapを使用してDNSクエリを同時に実行し、toArrayを使用して結果を配列に収集します
const dnsResults = await Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org'])
  .map(
    async domain => {
      const { address } = await resolver.resolve4(domain, { ttl: true })
      return address
    },
    { concurrency: 2 }
  )
  .toArray()
readable.some(fn[, options])

追加: v17.5.0, v16.15.0

[安定版: 1 - 試験的]

安定版: 1 安定性: 1 - 試験的

  • fn <Function> | <AsyncFunction> ストリームの各チャンクに対して呼び出す関数。

    • data <any> ストリームからのデータチャンク。
    • options <Object>
    • signal <AbortSignal> ストリームが破棄された場合に中断され、fn呼び出しを早期に中断できます。
  • options <Object>

    • concurrency <number> ストリームで一度に呼び出すfnの最大同時呼び出し数。デフォルト: 1
    • signal <AbortSignal> シグナルが中断された場合、ストリームを破棄できます。
  • 戻り値: <Promise> fnがチャンクの少なくとも 1 つに対して真の値を返した場合にtrueを評価する Promise。

このメソッドはArray.prototype.someに似ており、待機された戻り値がtrue(または真の値)になるまで、ストリームの各チャンクでfnを呼び出します。チャンクに対するfn呼び出しの待機された戻り値が真の値になると、ストリームは破棄され、Promise はtrueで満たされます。チャンクに対するfn呼び出しのいずれも真の値を返さない場合、Promise はfalseで満たされます。

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// 同期述語を使用します。
await Readable.from([1, 2, 3, 4]).some(x => x > 2) // true
await Readable.from([1, 2, 3, 4]).some(x => x < 0) // false

// 非同期述語を使用し、最大2つのファイルチェックを一度に実行します。
const anyBigFile = await Readable.from(['file1', 'file2', 'file3']).some(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
console.log(anyBigFile) // リスト内のファイルのいずれかが1MBより大きい場合`true`
console.log('done') // ストリームが終了しました
readable.find(fn[, options])

追加: v17.5.0, v16.17.0

[安定版: 1 - 実験的]

安定版: 1 安定性: 1 - 実験的

  • fn <Function> | <AsyncFunction> ストリームの各チャンクに対して呼び出す関数。

    • data <any> ストリームからのデータチャンク。
    • options <Object>
    • signal <AbortSignal> ストリームが破棄された場合に中断され、fn の呼び出しを早期に中止できます。
  • options <Object>

    • concurrency <number> ストリームで一度に呼び出す fn の最大同時呼び出し数。デフォルト: 1
    • signal <AbortSignal> シグナルが中断された場合にストリームの破棄を許可します。
  • 戻り値: <Promise> fn が真の値で評価された最初のチャンクを評価する Promise、または要素が見つからない場合は undefined

このメソッドは Array.prototype.find と似ており、ストリーム内の各チャンクに対して fn を呼び出し、fn の真の値を持つチャンクを見つけます。fn 呼び出しの待機された戻り値が真の値になると、ストリームは破棄され、fn が真の値を返した値を持つ Promise が解決されます。チャンクに対するすべての fn 呼び出しが偽の値を返す場合、Promise は undefined で解決されます。

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// 同期述語を使用した場合。
await Readable.from([1, 2, 3, 4]).find(x => x > 2) // 3
await Readable.from([1, 2, 3, 4]).find(x => x > 0) // 1
await Readable.from([1, 2, 3, 4]).find(x => x > 10) // undefined

// 非同期述語を使用し、最大2つのファイルチェックを同時に行う場合。
const foundBigFile = await Readable.from(['file1', 'file2', 'file3']).find(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
console.log(foundBigFile) // 大きなファイルのファイル名(リスト内のファイルが1MBより大きい場合)
console.log('done') // ストリームが終了しました
readable.every(fn[, options])

追加: v17.5.0, v16.15.0

[安定版: 1 - 試験的]

安定版: 1 安定性: 1 - 試験的

  • fn <Function> | <AsyncFunction> ストリームの各チャンクに対して呼び出す関数。

    • data <any> ストリームからのデータチャンク。
    • options <Object>
    • signal <AbortSignal> ストリームが破棄された場合に中止され、fn の呼び出しを早期に中止できます。
  • options <Object>

    • concurrency <number> ストリームで一度に呼び出すfnの最大同時呼び出し数。デフォルト: 1
    • signal <AbortSignal> シグナルが中断された場合、ストリームを破棄できます。
  • 戻り値: <Promise> fnがすべてのチャンクに対して真の値を返した場合にtrueを評価する Promise。

このメソッドはArray.prototype.everyに似ており、ストリームの各チャンクに対してfnを呼び出して、fnのすべての待機された戻り値が真の値であるかどうかを確認します。チャンクに対するfn呼び出しの待機された戻り値が偽の場合、ストリームは破棄され、Promise はfalseで解決されます。チャンクに対するすべてのfn呼び出しが真の値を返す場合、Promise はtrueで解決されます。

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// 同期述語を使用した場合。
await Readable.from([1, 2, 3, 4]).every(x => x > 2) // false
await Readable.from([1, 2, 3, 4]).every(x => x > 0) // true

// 非同期述語を使用し、最大2つのファイルチェックを同時に実行した場合。
const allBigFiles = await Readable.from(['file1', 'file2', 'file3']).every(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
// リスト内のすべてのファイルが1MiBより大きい場合`true`
console.log(allBigFiles)
console.log('done') // ストリームが終了しました
readable.flatMap(fn[, options])

追加日時: v17.5.0, v16.15.0

[安定版: 1 - 試験版]

安定版: 1 安定性: 1 - 試験版

  • fn <Function> | <AsyncGeneratorFunction> | <AsyncFunction> ストリーム内の各チャンクに対してマップする関数。

    • data <any> ストリームからのデータチャンク。
    • options <Object>
    • signal <AbortSignal> ストリームが破棄された場合に中止され、fn の呼び出しを早期に中止できます。
  • options <Object>

    • concurrency <number> ストリームに対して一度に呼び出す fn の最大同時実行数。デフォルト: 1
    • signal <AbortSignal> シグナルが中止された場合にストリームを破棄できます。
  • 戻り値: <Readable> 関数 fn でフラットマップされたストリーム。

このメソッドは、指定されたコールバックをストリームの各チャンクに適用し、その結果をフラット化することで、新しいストリームを返します。

fn からストリームまたは別の反復可能オブジェクト(または非同期反復可能オブジェクト)を返すことが可能で、結果のストリームは返されたストリームにマージ(フラット化)されます。

js
import { Readable } from 'node:stream'
import { createReadStream } from 'node:fs'

// 同期マッパーを使用。
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap(x => [x, x])) {
  console.log(chunk) // 1, 1, 2, 2, 3, 3, 4, 4
}
// 非同期マッパーを使用。4つのファイルの内容を結合する
const concatResult = Readable.from(['./1.mjs', './2.mjs', './3.mjs', './4.mjs']).flatMap(fileName =>
  createReadStream(fileName)
)
for await (const result of concatResult) {
  // これは4つのファイルすべての内容(すべてのチャンク)を含みます
  console.log(result)
}
readable.drop(limit[, options])

追加日時: v17.5.0, v16.15.0

[安定版: 1 - 試験段階]

安定版: 1 安定性: 1 - 試験段階

  • limit <number> readable から削除するチャンク数。

  • options <Object>

    • signal <AbortSignal> シグナルが中断された場合、ストリームの破棄を許可します。
  • 戻り値: <Readable> limit個のチャンクが削除されたストリーム。

このメソッドは、最初のlimit個のチャンクが削除された新しいストリームを返します。

js
import { Readable } from 'node:stream'

await Readable.from([1, 2, 3, 4]).drop(2).toArray() // [3, 4]
readable.take(limit[, options])

追加日時: v17.5.0, v16.15.0

[安定版: 1 - 試験段階]

安定版: 1 安定性: 1 - 試験段階

  • limit <number> readable から取得するチャンク数。

  • options <Object>

    • signal <AbortSignal> シグナルが中断された場合、ストリームの破棄を許可します。
  • 戻り値: <Readable> limit個のチャンクが取得されたストリーム。

このメソッドは、最初のlimit個のチャンクを含む新しいストリームを返します。

js
import { Readable } from 'node:stream'

await Readable.from([1, 2, 3, 4]).take(2).toArray() // [1, 2]
readable.reduce(fn[, initial[, options]])

追加日時: v17.5.0, v16.15.0

[安定版: 1 - 試験段階]

安定版: 1 安定性: 1 - 試験段階

  • fn <Function> | <AsyncFunction> ストリーム内の各チャンクに対して呼び出す縮約関数。

    • previous <any> 直前のfn呼び出しで得られた値、またはinitial値(指定されている場合)、そうでない場合はストリームの最初のチャンク。
    • data <any> ストリームからのデータチャンク。
    • options <Object>
    • signal <AbortSignal> ストリームが破棄された場合に中断され、fn呼び出しを早期に中断できます。
  • initial <any> 縮約で使用する初期値。

  • options <Object>

    • signal <AbortSignal> シグナルが中断された場合、ストリームの破棄を許可します。
  • 戻り値: <Promise> 縮約の最終値に対する Promise。

このメソッドは、ストリームの各チャンクに対して順にfnを呼び出し、前の要素の計算結果を渡します。縮約の最終値に対する Promise を返します。

initial値が指定されていない場合、ストリームの最初のチャンクが初期値として使用されます。ストリームが空の場合、ERR_INVALID_ARGSコードプロパティを持つTypeErrorで Promise が拒否されます。

js
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'

const directoryPath = './src'
const filesInDir = await readdir(directoryPath)

const folderSize = await Readable.from(filesInDir).reduce(async (totalSize, file) => {
  const { size } = await stat(join(directoryPath, file))
  return totalSize + size
}, 0)

console.log(folderSize)

縮約関数はストリーム要素を 1 つずつ反復処理するため、concurrencyパラメータや並列処理はありません。並列的にreduceを実行するには、非同期関数をreadable.mapメソッドに抽出できます。

js
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'

const directoryPath = './src'
const filesInDir = await readdir(directoryPath)

const folderSize = await Readable.from(filesInDir)
  .map(file => stat(join(directoryPath, file)), { concurrency: 2 })
  .reduce((totalSize, { size }) => totalSize + size, 0)

console.log(folderSize)

デュプレックスストリームとトランスフォームストリーム

クラス: stream.Duplex

[履歴]

バージョン変更内容
v6.8.0Duplexのインスタンスは、instanceof stream.Writableをチェックした際にtrueを返すようになりました。
v0.9.4追加: v0.9.4

デュプレックスストリームは、ReadableWritableの両方のインターフェースを実装するストリームです。

Duplexストリームの例:

duplex.allowHalfOpen

追加: v0.9.4

falseの場合、ストリームは読み込み側が終了したときに書き込み側を自動的に終了します。allowHalfOpenコンストラクタオプションで初期設定され、デフォルトはtrueです。

これは既存のDuplexストリームインスタンスのハーフオープン動作を変更するために手動で変更できますが、'end'イベントが emit される前に変更する必要があります。

クラス: stream.Transform

追加: v0.9.4

トランスフォームストリームは、出力が何らかの方法で入力に関連付けられたDuplexストリームです。すべてのDuplexストリームと同様に、TransformストリームはReadableWritableの両方のインターフェースを実装します。

Transformストリームの例:

transform.destroy([error])

[履歴]

バージョン変更内容
v14.0.0すでに破棄されているストリームでは、no-op として機能します。
v8.0.0追加: v8.0.0

ストリームを破棄し、オプションで'error'イベントを emit します。この呼び出しの後、トランスフォームストリームは内部リソースを解放します。実装者はこのメソッドをオーバーライドするのではなく、readable._destroy()を実装する必要があります。Transform_destroy()のデフォルトの実装は、emitCloseが false に設定されていない限り'close'も emit します。

destroy()が呼び出された後、それ以降の呼び出しは no-op になり、_destroy()からのエラーを除いて、'error'としてさらにエラーは emit されません。

stream.duplexPair([options])

追加日時: v22.6.0, v20.17.0

  • options <Object> バッファリングなどのオプションを設定するために、両方のDuplexコンストラクタに渡す値。
  • 戻り値: 2 つのDuplexインスタンスの <Array>

ユーティリティ関数duplexPairは、2 つのアイテムを持つ配列を返します。各アイテムは、もう一方と接続されたDuplexストリームです。

js
const [sideA, sideB] = duplexPair()

一方のストリームに書き込まれたものは、もう一方のストリームで読み取り可能になります。クライアントによって書き込まれたデータがサーバーによって読み取られ、その逆も同様になるネットワーク接続と同様の動作を提供します。

Duplex ストリームは対称的です。一方または他方を動作の違いなく使用できます。

stream.finished(stream[, options], callback)

[履歴]

バージョン変更
v19.5.0ReadableStreamWritableStreamのサポートを追加
v15.11.0signalオプションを追加
v14.0.0finished(stream, cb)は、コールバックを呼び出す前に'close'イベントを待ちます。実装ではレガシストリームを検出し、'close'を発行すると予想されるストリームにのみこの動作を適用しようとします。
v14.0.0Readableストリームで'end'の前に'close'を発行すると、ERR_STREAM_PREMATURE_CLOSEエラーが発生します。
v14.0.0finished(stream, cb)への呼び出しの前に既に終了しているストリームでコールバックが呼び出されます。
v10.0.0追加日時: v10.0.0
  • stream <Stream> | <ReadableStream> | <WritableStream> 読み取り可能および/または書き込み可能なストリーム/ウェブストリーム。

  • options <Object>

    • error <boolean> falseに設定されている場合、emit('error', err)への呼び出しは完了として扱われません。デフォルト: true
    • readable <boolean> falseに設定されている場合、ストリームがまだ読み取り可能であっても、ストリームが終了したときにコールバックが呼び出されます。デフォルト: true
    • writable <boolean> falseに設定されている場合、ストリームがまだ書き込み可能であっても、ストリームが終了したときにコールバックが呼び出されます。デフォルト: true
    • signal <AbortSignal> ストリームの終了待ちを中止できます。シグナルが中止された場合、基礎となるストリームは中止されません。コールバックはAbortErrorで呼び出されます。この関数によって追加されたすべての登録済みリスナーも削除されます。
  • callback <Function> オプションのエラー引数をとるコールバック関数。

  • 戻り値: <Function> 登録されているすべてのリスナーを削除するクリーンアップ関数。

ストリームが読み取り可能ではなくなり、書き込み可能ではなくなった、またはエラーが発生した、あるいは早期クローズイベントが発生したときに通知を受けるための関数です。

js
const { finished } = require('node:stream')
const fs = require('node:fs')

const rs = fs.createReadStream('archive.tar')

finished(rs, err => {
  if (err) {
    console.error('Stream failed.', err)
  } else {
    console.log('Stream is done reading.')
  }
})

rs.resume() // ストリームをドレインします。

ストリームが途中で破棄された(中断された HTTP リクエストなど)場合、'end'または'finish'を発行せず、エラー処理シナリオで特に役立ちます。

finished API はPromise 版を提供します。

stream.finished()は、callbackが呼び出された後も、ぶら下がっているイベントリスナー(特に'error''end''finish''close')を残します。これは、(間違ったストリーム実装による)予期しない'error'イベントによって予期しないクラッシュが発生しないようにするためです。これが望ましくない動作である場合、返されたクリーンアップ関数をコールバックで呼び出す必要があります。

js
const cleanup = finished(rs, err => {
  cleanup()
  // ...
})

stream.pipeline(source[, ...transforms], destination, callback)

stream.pipeline(streams, callback)

[履歴]

バージョン変更
v19.7.0, v18.16.0Web Streams のサポートを追加
v18.0.0無効なコールバックを callback 引数に渡すと、ERR_INVALID_CALLBACK ではなく ERR_INVALID_ARG_TYPE がスローされるようになりました。
v14.0.0pipeline(..., cb) は、コールバックを呼び出す前に 'close' イベントを待ちます。実装はレガシーストリームを検出し、'close' を emit することが予想されるストリームにのみこの動作を適用しようとします。
v13.10.0非同期ジェネレータのサポートを追加
v10.0.0追加: v10.0.0

ストリームとジェネレータ間のパイプ処理を行うモジュールメソッドで、エラーを転送し、適切にクリーンアップを行い、パイプラインが完了したときにコールバックを提供します。

js
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')

// pipeline API を使用して、一連のストリームを簡単にパイプし、
// パイプラインが完全に完了したときに通知を受け取ります。

// 潜在的に巨大な tar ファイルを効率的に gzip するパイプライン:

pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
  if (err) {
    console.error('Pipeline failed.', err)
  } else {
    console.log('Pipeline succeeded.')
  }
})

pipeline API は、Promise バージョン を提供します。

stream.pipeline() は、次のストリームを除くすべてのストリームで stream.destroy(err) を呼び出します。

  • 'end' または 'close' を emit した Readable ストリーム。
  • 'finish' または 'close' を emit した Writable ストリーム。

stream.pipeline() は、callback が呼び出された後も、ストリームにぶら下がるイベントリスナーを残します。失敗後にストリームを再利用する場合、これによりイベントリスナーのリークとエラーの飲み込みが発生する可能性があります。最後のストリームが読み取り可能な場合、最後のストリームを後で消費できるように、ぶら下がっているイベントリスナーは削除されます。

stream.pipeline() は、エラーが発生するとすべてのストリームを閉じます。pipeline を使用した IncomingRequest は、予期しない応答を送信せずにソケットを破棄するため、予期しない動作につながる可能性があります。以下の例を参照してください。

js
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')

const server = http.createServer((req, res) => {
  const fileStream = fs.createReadStream('./fileNotExist.txt')
  pipeline(fileStream, res, err => {
    if (err) {
      console.log(err) // ファイルが存在しません
      // `pipeline` が既にソケットを破棄しているので、このメッセージを送信できません
      return res.end('error!!!')
    }
  })
})

stream.compose(...streams)

[履歴]

バージョン変更
v21.1.0, v20.10.0stream クラスのサポートを追加
v19.8.0, v18.16.0Web Streams のサポートを追加
v16.9.0追加: v16.9.0

[安定性: 1 - 試験段階]

安定性: 1 安定性: 1 - stream.compose は試験段階です。

最初のストリームに書き込み、最後のストリームから読み取るDuplexストリームに 2 つ以上のストリームを結合します。stream.pipelineを使用して、提供された各ストリームは次のストリームにパイプされます。ストリームのいずれかがエラーになった場合、外部のDuplexストリームを含むすべてのストリームが破棄されます。

stream.composeは、順番に他のストリームにパイプできる(そしてするべき)新しいストリームを返すため、合成を可能にします。対照的に、stream.pipelineにストリームを渡す場合、通常、最初のストリームは読み取り可能なストリームで、最後のストリームは書き込み可能なストリームであり、閉ループを形成します。

Functionが渡された場合、それはsource Iterableを取るファクトリメソッドでなければなりません。

js
import { compose, Transform } from 'node:stream'

const removeSpaces = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, String(chunk).replace(' ', ''))
  },
})

async function* toUpper(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase()
  }
}

let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
  res += buf
}

console.log(res) // 'HELLOWORLD' と出力

stream.composeは、非同期イテラブル、ジェネレータ、関数をストリームに変換するために使用できます。

  • AsyncIterableは読み取り可能なDuplexに変換されます。nullを生成することはできません。
  • AsyncGeneratorFunctionは読み取り/書き込み可能な変換Duplexに変換されます。最初の引数としてソースAsyncIterableを取らなければなりません。nullを生成することはできません。
  • AsyncFunctionは書き込み可能なDuplexに変換されます。nullまたはundefinedを返す必要があります。
js
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'

// AsyncIterableを、読み取り可能なDuplexに変換します。
const s1 = compose(
  (async function* () {
    yield 'Hello'
    yield 'World'
  })()
)

// AsyncGeneratorを、変換Duplexに変換します。
const s2 = compose(async function* (source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase()
  }
})

let res = ''

// AsyncFunctionを、書き込み可能なDuplexに変換します。
const s3 = compose(async function (source) {
  for await (const chunk of source) {
    res += chunk
  }
})

await finished(compose(s1, s2, s3))

console.log(res) // 'HELLOWORLD' と出力

演算子としてのstream.composeについては、readable.compose(stream)を参照してください。

stream.Readable.from(iterable[, options])

追加: v12.3.0, v10.17.0

  • iterable <Iterable> Symbol.asyncIteratorまたはSymbol.iteratorイテラブルプロトコルを実装するオブジェクト。null値が渡された場合、「error」イベントを発生させます。
  • options <Object> new stream.Readable([options])に提供されるオプション。デフォルトでは、Readable.from()options.objectModetrueに設定しますが、options.objectModefalseに設定することで明示的にオプトアウトできます。
  • 戻り値: <stream.Readable>

イテレータから読み込みストリームを作成するためのユーティリティメソッド。

js
const { Readable } = require('node:stream')

async function* generate() {
  yield 'hello'
  yield 'streams'
}

const readable = Readable.from(generate())

readable.on('data', chunk => {
  console.log(chunk)
})

パフォーマンス上の理由から、Readable.from(string)またはReadable.from(buffer)を呼び出しても、文字列やバッファは他のストリームのセマンティクスと一致するようにイテレートされません。

Promise を含むIterableオブジェクトが引数として渡された場合、未処理の拒否につながる可能性があります。

js
const { Readable } = require('node:stream')

Readable.from([
  new Promise(resolve => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // 未処理の拒否
])

stream.Readable.fromWeb(readableStream[, options])

追加: v17.0.0

[安定版: 1 - 実験的]

安定版: 1 安定性: 1 - 実験的

stream.Readable.isDisturbed(stream)

追加日: v16.8.0

[安定版: 1 - 実験的]

安定版: 1 安定性: 1 - 実験的

ストリームが読み取られたか、キャンセルされたかどうかを返します。

stream.isErrored(stream)

追加日: v17.3.0, v16.14.0

[安定版: 1 - 実験的]

安定版: 1 安定性: 1 - 実験的

ストリームでエラーが発生したかどうかを返します。

stream.isReadable(stream)

追加日: v17.4.0, v16.14.0

[安定版: 1 - 実験的]

安定版: 1 安定性: 1 - 実験的

ストリームが読み取り可能かどうかを返します。

stream.Readable.toWeb(streamReadable[, options])

追加日: v17.0.0

[安定版: 1 - 実験的]

安定版: 1 安定性: 1 - 実験的

  • streamReadable <stream.Readable>
  • options <Object>
    • strategy <Object>
    • highWaterMark <number> 与えられたstream.Readableからの読み取りにおいて、バックプレッシャーが適用される前に、(作成されたReadableStreamの)内部キューの最大サイズ。値が指定されていない場合、与えられたstream.Readableから取得されます。
    • size <Function> 与えられたデータチャンクのサイズを計算する関数。値が指定されていない場合、すべてのチャンクのサイズは1になります。
    • chunk <any>
    • 戻り値: <number>
  • 戻り値: <ReadableStream>

stream.Writable.fromWeb(writableStream[, options])

追加日時: v17.0.0

[安定版: 1 - 実験的]

安定版: 1 安定性: 1 - 実験的

stream.Writable.toWeb(streamWritable)

追加日時: v17.0.0

[安定版: 1 - 実験的]

安定版: 1 安定性: 1 - 実験的

stream.Duplex.from(src)

[履歴]

バージョン変更点
v19.5.0, v18.17.0src 引数に ReadableStream または WritableStream を使用できるようになりました。
v16.8.0追加日時: v16.8.0

デュプレックスストリームを作成するためのユーティリティメソッドです。

  • Stream は、書き込みストリームを書き込み可能な Duplex に、読み込みストリームを Duplex に変換します。
  • Blob は読み込み可能な Duplex に変換されます。
  • string は読み込み可能な Duplex に変換されます。
  • ArrayBuffer は読み込み可能な Duplex に変換されます。
  • AsyncIterable は読み込み可能な Duplex に変換されます。null を生成することはできません。
  • AsyncGeneratorFunction は読み込み/書き込み可能な変換 Duplex に変換されます。最初の引数としてソース AsyncIterable を取る必要があります。null を生成することはできません。
  • AsyncFunction は書き込み可能な Duplex に変換されます。null または undefined を返す必要があります。
  • Object ({ writable, readable })readablewritableStream に変換し、それらを Duplex に結合します。Duplexwritable に書き込み、readable から読み取ります。
  • Promise は読み込み可能な Duplex に変換されます。値 null は無視されます。
  • ReadableStream は読み込み可能な Duplex に変換されます。
  • WritableStream は書き込み可能な Duplex に変換されます。
  • 戻り値: <stream.Duplex>

Promise を含む Iterable オブジェクトを引数として渡すと、未処理の拒否が発生する可能性があります。

js
const { Duplex } = require('node:stream')

Duplex.from([
  new Promise(resolve => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // 未処理の拒否
])

stream.Duplex.fromWeb(pair[, options])

追加バージョン: v17.0.0

[安定版: 1 - 試験段階]

安定版: 1 安定性: 1 - 試験段階

js
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world')
  },
})

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk)
  },
})

const pair = {
  readable,
  writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })

duplex.write('hello')

for await (const chunk of duplex) {
  console.log('readable', chunk)
}
js
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world')
  },
})

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk)
  },
})

const pair = {
  readable,
  writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })

duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))

stream.Duplex.toWeb(streamDuplex)

追加日時: v17.0.0

[安定版: 1 - 試験版]

安定版: 1 安定性: 1 - 試験版

js
import { Duplex } from 'node:stream'

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world')
    this.push(null)
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk)
    callback()
  },
})

const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')

const { value } = await readable.getReader().read()
console.log('readable', value)
js
const { Duplex } = require('node:stream')

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world')
    this.push(null)
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk)
    callback()
  },
})

const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')

readable
  .getReader()
  .read()
  .then(result => {
    console.log('readable', result.value)
  })

stream.addAbortSignal(signal, stream)

[履歴]

バージョン変更点
v19.7.0, v18.16.0ReadableStreamWritableStreamのサポートを追加
v15.4.0追加日時: v15.4.0

読み取り可能または書き込み可能なストリームに AbortSignal をアタッチします。これにより、AbortControllerを使用してストリームの破棄を制御できます。

渡されたAbortSignalに対応するAbortControllerabortを呼び出すと、ストリームで.destroy(new AbortError())を呼び出した場合と同様に動作し、web ストリームの場合はcontroller.error(new AbortError())となります。

js
const fs = require('node:fs')

const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// 後で、操作を中止してストリームを閉じます
controller.abort()

または、読み取り可能なストリームを非同期イテラブルとして使用するAbortSignalを使用します。

js
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // タイムアウトを設定
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
  try {
    for await (const chunk of stream) {
      await process(chunk)
    }
  } catch (e) {
    if (e.name === 'AbortError') {
      // 操作がキャンセルされました
    } else {
      throw e
    }
  }
})()

または、AbortSignalReadableStreamで使用します。

js
const controller = new AbortController()
const rs = new ReadableStream({
  start(controller) {
    controller.enqueue('hello')
    controller.enqueue('world')
    controller.close()
  },
})

addAbortSignal(controller.signal, rs)

finished(rs, err => {
  if (err) {
    if (err.name === 'AbortError') {
      // 操作がキャンセルされました
    }
  }
})

const reader = rs.getReader()

reader.read().then(({ value, done }) => {
  console.log(value) // hello
  console.log(done) // false
  controller.abort()
})

stream.getDefaultHighWaterMark(objectMode)

追加: v19.9.0, v18.17.0

ストリームで使用されるデフォルトの highWaterMark を返します。デフォルトは65536(64 KiB)で、objectModeの場合は16です。

stream.setDefaultHighWaterMark(objectMode, value)

追加: v19.9.0, v18.17.0

ストリームで使用されるデフォルトの highWaterMark を設定します。

ストリーム実装者向け API

node:streamモジュールの API は、JavaScript のプロトタイプ継承モデルを使用してストリームを容易に実装できるように設計されています。

まず、ストリーム開発者は、4 つの基本的なストリームクラス(stream.Writable, stream.Readable, stream.Duplex, またはstream.Transform)のいずれかを拡張する新しい JavaScript クラスを宣言し、適切な親クラスコンストラクタを呼び出していることを確認する必要があります。

js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  constructor({ highWaterMark, ...options }) {
    super({ highWaterMark })
    // ...
  }
}

ストリームを拡張する際には、ユーザーが提供できるオプションと、基本コンストラクタに渡す前にユーザーが提供する必要があるオプションを考慮してください。例えば、実装がautoDestroyemitCloseオプションに関して仮定を行う場合、ユーザーがこれらをオーバーライドすることを許可しないでください。すべてのオプションを暗黙的に転送するのではなく、転送されるオプションを明示的に指定してください。

新しいストリームクラスは、作成されるストリームの種類に応じて、以下の表に示すように、1 つ以上の特定のメソッドを実装する必要があります。

ユースケースクラス実装するメソッド
読み取り専用Readable_read()
書き込み専用Writable_write() , _writev() , _final()
読み取りと書き込みDuplex_read() , _write() , _writev() , _final()
書き込まれたデータに対して操作を行い、その結果を読み取るTransform_transform() , _flush() , _final()

ストリームの実装コードは、ストリームのコンシューマが使用するように意図されたストリームの「パブリック」メソッド(ストリームコンシューマ向け APIセクションで説明されている)を決して呼び出してはいけません。そうすると、ストリームを消費するアプリケーションコードに悪影響を与える可能性があります。

write()end()cork()uncork()read()destroy()などのパブリックメソッドをオーバーライドしたり、.emit()を使用して'error''data''end''finish''close'などの内部イベントを発生させたりしないでください。これを行うと、現在のストリームと将来のストリームの不変性が壊れ、他のストリーム、ストリームユーティリティ、およびユーザーの期待との動作や互換性の問題につながる可能性があります。

簡素化された構築

追加バージョン: v1.2.0

多くの単純なケースでは、継承に頼らずにストリームを作成できます。これは、stream.Writablestream.Readablestream.Duplex、またはstream.Transformオブジェクトのインスタンスを直接作成し、適切なメソッドをコンストラクタオプションとして渡すことで実現できます。

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  construct(callback) {
    // 状態の初期化とリソースの読み込み...
  },
  write(chunk, encoding, callback) {
    // ...
  },
  destroy() {
    // リソースの解放...
  },
})

書き込み可能ストリームの実装

stream.Writableクラスは、Writableストリームを実装するために拡張されます。

カスタムWritableストリームは、new stream.Writable([options])コンストラクタを呼び出し、writable._write()メソッドおよび/またはwritable._writev()メソッドを実装する必要があります

new stream.Writable([options])

[履歴]

バージョン変更
v22.0.0デフォルトの highWaterMark を増加
v15.5.0AbortSignal の受け渡しをサポート
v14.0.0autoDestroyオプションのデフォルトをtrueに変更
v11.2.0, v10.16.0ストリームが'finish'を emit したりエラーが発生したときに自動的にストリームをdestroy()するautoDestroyオプションを追加
v10.0.0destroy時に'close'を emit するかどうかを指定するemitCloseオプションを追加
  • options <Object>
    • highWaterMark <number> stream.write()falseを返し始めるバッファレベル。デフォルト: 65536 (64 KiB)、objectModeストリームの場合は16
    • decodeStrings <boolean> stream.write()に渡されたstringstream.write()呼び出しで指定されたエンコーディングを使用してBufferにエンコードするかどうか。他の種類のデータは変換されません(つまり、Bufferstringにデコードされません)。falseに設定すると、stringの変換が行われなくなります。デフォルト: true
    • defaultEncoding <string> stream.write()に引数としてエンコーディングが指定されていない場合に使用されるデフォルトのエンコーディング。デフォルト: 'utf8'
    • objectMode <boolean> stream.write(anyObj)が有効な操作かどうか。設定されると、ストリームの実装でサポートされている場合、文字列、<Buffer><TypedArray><DataView>以外の JavaScript 値を書き込むことが可能になります。デフォルト: false
    • emitClose <boolean> ストリームが破棄された後に'close'を emit するかどうか。デフォルト: true
    • write <Function> stream._write()メソッドの実装。
    • writev <Function> stream._writev()メソッドの実装。
    • destroy <Function> stream._destroy()メソッドの実装。
    • final <Function> stream._final()メソッドの実装。
    • construct <Function> stream._construct()メソッドの実装。
    • autoDestroy <boolean> ストリームが終了後に自動的に自身で.destroy()を呼び出すかどうか。デフォルト: true
    • signal <AbortSignal> 可能性のあるキャンセルを表すシグナル。
js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  constructor(options) {
    // stream.Writable()コンストラクタを呼び出します。
    super(options)
    // ...
  }
}

または、ES6 以前のスタイルのコンストラクタを使用する場合:

js
const { Writable } = require('node:stream')
const util = require('node:util')

function MyWritable(options) {
  if (!(this instanceof MyWritable)) return new MyWritable(options)
  Writable.call(this, options)
}
util.inherits(MyWritable, Writable)

または、簡素化されたコンストラクタアプローチを使用する場合:

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
})

渡されたAbortSignalに対応するAbortControllerabortを呼び出すと、書き込み可能ストリームで.destroy(new AbortError())を呼び出した場合と同じ動作になります。

js
const { Writable } = require('node:stream')

const controller = new AbortController()
const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
  signal: controller.signal,
})
// 後で、操作を中止してストリームを閉じます
controller.abort()

writable._construct(callback)

追加日時: v15.0.0

  • callback <Function> ストリームの初期化が完了したときに、(エラー引数をオプションで指定して)この関数を呼び出します。

_construct()メソッドは、直接呼び出してはなりません。子クラスで実装できる場合があり、その場合は内部のWritableクラスメソッドによってのみ呼び出されます。

このオプションの関数は、ストリームコンストラクタが返された後のティックで呼び出され、callbackが呼び出されるまで、_write()_final()_destroy()の呼び出しを遅延させます。これは、ストリームを使用する前に状態を初期化したり、リソースを非同期的に初期化したりする場合に役立ちます。

js
const { Writable } = require('node:stream')
const fs = require('node:fs')

class WriteStream extends Writable {
  constructor(filename) {
    super()
    this.filename = filename
    this.fd = null
  }
  _construct(callback) {
    fs.open(this.filename, 'w', (err, fd) => {
      if (err) {
        callback(err)
      } else {
        this.fd = fd
        callback()
      }
    })
  }
  _write(chunk, encoding, callback) {
    fs.write(this.fd, chunk, callback)
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, er => callback(er || err))
    } else {
      callback(err)
    }
  }
}

writable._write(chunk, encoding, callback)

[履歴]

バージョン変更点
v12.11.0_writev()を提供する場合、_write()はオプションになります。
  • chunk <Buffer> | <string> | <any> 書き込むBufferstream.write()に渡されたstringから変換されます。ストリームのdecodeStringsオプションがfalseの場合、またはストリームがオブジェクトモードで動作している場合、チャンクは変換されず、stream.write()に渡されたものになります。
  • encoding <string> チャンクが文字列の場合、encodingはその文字列の文字エンコーディングです。チャンクがBufferの場合、またはストリームがオブジェクトモードで動作している場合、encodingは無視される場合があります。
  • callback <Function> 指定されたチャンクの処理が完了したときに、(エラー引数をオプションで指定して)この関数を呼び出します。

すべてのWritableストリーム実装は、基になるリソースにデータを送信するために、writable._write()と/またはwritable._writev()メソッドを提供する必要があります。

Transformストリームは、writable._write()の独自のインプリメンテーションを提供します。

この関数は、アプリケーションコードによって直接呼び出してはなりません。子クラスで実装する必要があり、内部のWritableクラスメソッドによってのみ呼び出されます。

callback関数は、writable._write()内で同期的に、または非同期的に(つまり、別のティックで)呼び出して、書き込みが正常に完了したか、エラーで失敗したかを信号で伝える必要があります。callbackに渡される最初の引数は、呼び出しが失敗した場合はErrorオブジェクト、書き込みが成功した場合はnullでなければなりません。

writable._write()が呼び出されてからcallbackが呼び出されるまでの間に発生するwritable.write()へのすべての呼び出しにより、書き込まれたデータがバッファリングされます。callbackが呼び出されると、ストリームは'drain'イベントを発行することがあります。ストリーム実装が一度に複数のチャンクデータを処理できる場合は、writable._writev()メソッドを実装する必要があります。

decodeStringsプロパティがコンストラクタオプションで明示的にfalseに設定されている場合、chunk.write()に渡されたオブジェクトのままになり、Bufferではなく文字列になる可能性があります。これは、特定の文字列データエンコーディングに対して最適化された処理を持つ実装をサポートするためです。その場合、encoding引数は文字列の文字エンコーディングを示します。それ以外の場合は、encoding引数は安全に無視できます。

writable._write()メソッドは、アンダースコアで始まるため、それを定義するクラスに対して内部であり、ユーザープログラムによって直接呼び出してはならないことを示しています。

writable._writev(chunks, callback)

  • chunks <Object[]> 書き込むデータ。書き込む個別のデータチャンクを表す <Object> の配列です。これらのオブジェクトのプロパティは以下の通りです。

    • chunk <Buffer> | <string> 書き込むデータを含むバッファインスタンスまたは文字列。WritabledecodeStrings オプションを false に設定して作成され、文字列が write() に渡された場合、chunk は文字列になります。
    • encoding <string> chunk の文字エンコーディング。chunkBuffer の場合、encoding'buffer' になります。
  • callback <Function> 指定されたチャンクの処理が完了したときに呼び出されるコールバック関数(オプションでエラー引数付き)。

この関数は、アプリケーションコードによって直接呼び出してはなりません。子クラスで実装し、内部の Writable クラスメソッドのみによって呼び出される必要があります。

writable._writev() メソッドは、一度に複数のデータチャンクを処理できるストリーム実装において、writable._write() に加えて、または代わりに実装できます。実装されていて、前の書き込みからのバッファリングされたデータがある場合、_write() の代わりに _writev() が呼び出されます。

writable._writev() メソッドは、アンダースコアで始まるため、それを定義するクラスの内部であり、ユーザープログラムから直接呼び出されるべきではありません。

writable._destroy(err, callback)

追加日時: v8.0.0

  • err <Error> エラーの可能性。
  • callback <Function> オプションのエラー引数をとるコールバック関数。

_destroy() メソッドは、writable.destroy() によって呼び出されます。子クラスでオーバーライドできますが、直接呼び出してはなりません

writable._final(callback)

追加日時: v8.0.0

  • callback <Function> 残りデータの書き込みが完了したら、この関数(オプションでエラー引数付き)を呼び出します。

_final()メソッドは、直接呼び出してはなりません。子クラスで実装できる場合があり、その場合は内部のWritableクラスメソッドによってのみ呼び出されます。

このオプションの関数は、ストリームが閉じる前に呼び出され、callbackが呼び出されるまで'finish'イベントが遅延されます。これは、ストリームが終了する前にリソースを閉じたり、バッファされたデータを書いたりするのに役立ちます。

書き込み中のエラー

writable._write()writable._writev()、およびwritable._final()メソッドの処理中に発生したエラーは、コールバックを呼び出してエラーを最初の引数として渡すことで伝播する必要があります。これらのメソッド内からErrorをスローしたり、'error'イベントを手動で発行したりすると、動作が未定義になります。

Writableがエラーを発行したときにReadableストリームがWritableストリームにパイプされている場合、Readableストリームはアンパイプされます。

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'))
    } else {
      callback()
    }
  },
})

書き込み可能ストリームの例

以下は、かなり単純化された(そしてやや無意味な)カスタムWritableストリーム実装を示しています。この特定のWritableストリームインスタンスは、実際には特に有用ではありませんが、この例は、カスタムWritableストリームインスタンスに必要な要素をそれぞれ示しています。

js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'))
    } else {
      callback()
    }
  }
}

書き込み可能ストリームでのバッファのデコード

バッファのデコードは、入力が文字列であるトランスフォーマを使用する場合など、一般的なタスクです。UTF-8 などのマルチバイト文字エンコーディングを使用する場合、これは簡単なプロセスではありません。次の例は、StringDecoderWritableを使用してマルチバイト文字列をデコードする方法を示しています。

js
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')

class StringWritable extends Writable {
  constructor(options) {
    super(options)
    this._decoder = new StringDecoder(options?.defaultEncoding)
    this.data = ''
  }
  _write(chunk, encoding, callback) {
    if (encoding === 'buffer') {
      chunk = this._decoder.write(chunk)
    }
    this.data += chunk
    callback()
  }
  _final(callback) {
    this.data += this._decoder.end()
    callback()
  }
}

const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()

w.write('currency: ')
w.write(euro[0])
w.end(euro[1])

console.log(w.data) // currency: €

読み込み可能ストリームの実装

Readableストリームを実装するには、stream.Readableクラスを拡張します。

カスタムReadableストリームは、new stream.Readable([options])コンストラクタを呼び出し、readable._read()メソッドを実装する必要があります。

new stream.Readable([options])

[履歴]

バージョン変更
v22.0.0デフォルトの highWaterMark を増加
v15.5.0AbortSignal の受け渡しをサポート
v14.0.0autoDestroyオプションのデフォルトをtrueに変更
v11.2.0, v10.16.0'end'またはエラーが発生したときにストリームを自動的にdestroy()するautoDestroyオプションを追加
  • options <Object>
    • highWaterMark <number> 基になるリソースからの読み取りを停止する前に、内部バッファに格納する最大バイト数デフォルト: 65536 (64 KiB)、またはobjectModeストリームの場合は16
    • encoding <string> 指定した場合、バッファは指定されたエンコーディングを使用して文字列にデコードされます。デフォルト: null
    • objectMode <boolean> このストリームがオブジェクトのストリームとして動作するかどうか。つまり、stream.read(n)は、nサイズのBufferではなく、単一の値を返します。デフォルト: false
    • emitClose <boolean> ストリームが破棄された後に'close'を emit するかどうか。デフォルト: true
    • read <Function> stream._read()メソッドの実装。
    • destroy <Function> stream._destroy()メソッドの実装。
    • construct <Function> stream._construct()メソッドの実装。
    • autoDestroy <boolean> 終了後にこのストリームが自動的に.destroy()を自身に呼び出すかどうか。デフォルト: true
    • signal <AbortSignal> 可能性のあるキャンセルを表すシグナル。
js
const { Readable } = require('node:stream')

class MyReadable extends Readable {
  constructor(options) {
    // stream.Readable(options)コンストラクタを呼び出します。
    super(options)
    // ...
  }
}

または、ES6 以前のスタイルのコンストラクタを使用する場合:

js
const { Readable } = require('node:stream')
const util = require('node:util')

function MyReadable(options) {
  if (!(this instanceof MyReadable)) return new MyReadable(options)
  Readable.call(this, options)
}
util.inherits(MyReadable, Readable)

または、簡略化されたコンストラクタアプローチを使用する場合:

js
const { Readable } = require('node:stream')

const myReadable = new Readable({
  read(size) {
    // ...
  },
})

渡されたAbortSignalに対応するAbortControllerabortを呼び出すと、作成された読み込み可能オブジェクトで.destroy(new AbortError())を呼び出した場合と同じ動作になります。

js
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
  read(size) {
    // ...
  },
  signal: controller.signal,
})
// 後で、操作を中止してストリームを閉じます
controller.abort()

readable._construct(callback)

追加日時: v15.0.0

  • callback <Function> ストリームの初期化が完了した際に、(エラー引数をオプションで指定して)この関数を呼び出します。

_construct()メソッドは、直接呼び出してはいけません。子クラスで実装できる場合があり、その場合は内部のReadableクラスのメソッドによってのみ呼び出されます。

このオプション関数は、ストリームコンストラクタによって次のティックでスケジュールされ、callbackが呼び出されるまで、すべての_read()_destroy()の呼び出しを遅延させます。これは、ストリームを使用する前に状態を初期化したり、非同期的にリソースを初期化したりする場合に便利です。

js
const { Readable } = require('node:stream')
const fs = require('node:fs')

class ReadStream extends Readable {
  constructor(filename) {
    super()
    this.filename = filename
    this.fd = null
  }
  _construct(callback) {
    fs.open(this.filename, (err, fd) => {
      if (err) {
        callback(err)
      } else {
        this.fd = fd
        callback()
      }
    })
  }
  _read(n) {
    const buf = Buffer.alloc(n)
    fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
      if (err) {
        this.destroy(err)
      } else {
        this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
      }
    })
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, er => callback(er || err))
    } else {
      callback(err)
    }
  }
}

readable._read(size)

追加日時: v0.9.4

  • size <number> 非同期で読み取るバイト数

この関数は、アプリケーションコードから直接呼び出してはいけません。子クラスで実装する必要があり、内部のReadableクラスのメソッドによってのみ呼び出されます。

すべてのReadableストリーム実装は、基になるリソースからデータを取得するために、readable._read()メソッドの実装を提供する必要があります。

readable._read()が呼び出された場合、リソースからデータが利用可能であれば、実装はthis.push(dataChunk)メソッドを使用して、そのデータを読み込みキューにプッシュし始める必要があります。ストリームがさらにデータを受け入れる準備ができると、this.push(dataChunk)を呼び出すたびに_read()が再び呼び出されます。readable.push()falseを返すまで、_read()はリソースからの読み取りとデータのプッシュを続けることができます。_read()が停止した後に再度呼び出された場合にのみ、追加のデータをキューにプッシュを再開する必要があります。

readable._read()メソッドが呼び出されると、readable.push()メソッドを介してさらにデータがプッシュされるまで、再び呼び出されることはありません。空のバッファや文字列などの空のデータは、readable._read()を呼び出す原因にはなりません。

size引数は助言的なものです。「読み取り」がデータが返される単一操作である実装では、size引数を使用して、取得するデータの量を決定できます。他の実装では、この引数を無視し、データが利用可能になったときにデータを提供するだけです。stream.push(chunk)を呼び出す前にsizeバイトが利用可能になるまで「待つ」必要はありません。

readable._read()メソッドは、アンダースコアで始まるプレフィックスが付いているのは、それを定義するクラスの内側にあり、ユーザープログラムから直接呼び出されるべきではないためです。

readable._destroy(err, callback)

追加日時: v8.0.0

  • err <Error> エラーの可能性。
  • callback <Function> オプションのエラー引数をとるコールバック関数。

_destroy() メソッドは readable.destroy() によって呼び出されます。子クラスで上書きできますが、直接呼び出してはなりません

readable.push(chunk[, encoding])

[履歴]

バージョン変更点
v22.0.0, v20.13.0chunk 引数に TypedArray または DataView インスタンスを使用できるようになりました。
v8.0.0chunk 引数に Uint8Array インスタンスを使用できるようになりました。
  • chunk <Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> 読み込みキューにプッシュするデータのチャンク。オブジェクトモードで動作していないストリームの場合、chunk<string><Buffer><TypedArray>、または<DataView> でなければなりません。オブジェクトモードストリームの場合、chunk は任意の JavaScript 値にすることができます。
  • encoding <string> 文字列チャンクのエンコーディング。'utf8''ascii' などの有効な Buffer エンコーディングである必要があります。
  • 戻り値: <boolean> さらにデータチャンクをプッシュできる場合は true、そうでない場合は false

chunk<Buffer><TypedArray><DataView>、または<string> の場合、データのchunkはストリームのユーザーが消費するために内部キューに追加されます。chunknull として渡すと、ストリームの終わり (EOF) を示し、それ以降はデータを追加できなくなります。

Readable が一時停止モードで動作している場合、readable.push() で追加されたデータは、'readable' イベントが発行されたときに readable.read() メソッドを呼び出すことで読み取ることができます。

Readable がフローイングモードで動作している場合、readable.push() で追加されたデータは 'data' イベントの発行によって配信されます。

readable.push() メソッドは可能な限り柔軟になるように設計されています。たとえば、何らかのポーズ/再開メカニズムとデータコールバックを提供する下位レベルのソースをラップする場合、下位レベルのソースはカスタム Readable インスタンスでラップできます。

js
// `_source` は、readStop() と readStart() メソッド、
// データがあるときに呼び出される `ondata` メンバー、
// データが終了したときに呼び出される `onend` メンバーを持つオブジェクトです。

class SourceWrapper extends Readable {
  constructor(options) {
    super(options)

    this._source = getLowLevelSourceObject()

    // データがあるたびに、それを内部バッファーにプッシュします。
    this._source.ondata = chunk => {
      // push() が false を返す場合、ソースからの読み取りを停止します。
      if (!this.push(chunk)) this._source.readStop()
    }

    // ソースが終了したら、EOF を示す `null` チャンクをプッシュします。
    this._source.onend = () => {
      this.push(null)
    }
  }
  // _read() は、ストリームがより多くのデータを引き込みたいときに呼び出されます。
  // この場合、アドバイザリサイズの引数は無視されます。
  _read(size) {
    this._source.readStart()
  }
}

readable.push() メソッドは、コンテンツを内部バッファーにプッシュするために使用されます。これは、readable._read() メソッドによって駆動できます。

オブジェクトモードで動作していないストリームの場合、readable.push()chunk パラメーターが undefined の場合、空の文字列またはバッファーとして扱われます。詳細は readable.push('') を参照してください。

読み込み中のエラー

readable._read() の処理中に発生するエラーは、readable.destroy(err) メソッドを通じて伝播する必要があります。readable._read() 内で Error をスローしたり、'error' イベントを手動で emit したりすると、動作が未定義になります。

js
const { Readable } = require('node:stream')

const myReadable = new Readable({
  read(size) {
    const err = checkSomeErrorCondition()
    if (err) {
      this.destroy(err)
    } else {
      // Do some work.
    }
  },
})

カウンターストリームの例

以下は、1 から 1,000,000 までの数字を昇順に emit し、その後終了する Readable ストリームの基本的な例です。

js
const { Readable } = require('node:stream')

class Counter extends Readable {
  constructor(opt) {
    super(opt)
    this._max = 1000000
    this._index = 1
  }

  _read() {
    const i = this._index++
    if (i > this._max) this.push(null)
    else {
      const str = String(i)
      const buf = Buffer.from(str, 'ascii')
      this.push(buf)
    }
  }
}

デュプレックスストリームの実装

Duplex ストリームは、TCP ソケット接続など、ReadableWritable の両方を実装するストリームです。

JavaScript は多重継承をサポートしていないため、Duplex ストリームを実装するには(stream.Readable stream.Writable クラスを拡張するのではなく)、stream.Duplex クラスを拡張します。

stream.Duplex クラスはプロトタイプチェーンで stream.Readable から継承し、パラサイト的に stream.Writable から継承しますが、stream.Writable 上で Symbol.hasInstance をオーバーライドするため、両方の基本クラスに対して instanceof は正しく機能します。

カスタム Duplex ストリームは、new stream.Duplex([options]) コンストラクタを呼び出し、readable._read() メソッドと writable._write() メソッドの 両方 を実装する 必要があります

new stream.Duplex(options)

[履歴]

バージョン変更
v8.4.0readableHighWaterMark および writableHighWaterMark オプションがサポートされました。
  • options <Object> Writable および Readable コンストラクタの両方に渡されます。以下のフィールドも持ちます。
    • allowHalfOpen <boolean> false に設定されている場合、ストリームは読み込み側が終了したときに書き込み側を自動的に終了します。デフォルト: true
    • readable <boolean> Duplex が読み込み可能かどうかを設定します。デフォルト: true
    • writable <boolean> Duplex が書き込み可能かどうかを設定します。デフォルト: true
    • readableObjectMode <boolean> ストリームの読み込み側の objectMode を設定します。objectModetrue の場合、効果はありません。デフォルト: false
    • writableObjectMode <boolean> ストリームの書き込み側の objectMode を設定します。objectModetrue の場合、効果はありません。デフォルト: false
    • readableHighWaterMark <number> ストリームの読み込み側の highWaterMark を設定します。highWaterMark が提供されている場合、効果はありません。
    • writableHighWaterMark <number> ストリームの書き込み側の highWaterMark を設定します。highWaterMark が提供されている場合、効果はありません。
js
const { Duplex } = require('node:stream')

class MyDuplex extends Duplex {
  constructor(options) {
    super(options)
    // ...
  }
}

または、ES6 以前スタイルのコンストラクタを使用する場合:

js
const { Duplex } = require('node:stream')
const util = require('node:util')

function MyDuplex(options) {
  if (!(this instanceof MyDuplex)) return new MyDuplex(options)
  Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)

または、簡略化されたコンストラクタアプローチを使用する場合:

js
const { Duplex } = require('node:stream')

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  },
})

パイプラインを使用する場合:

js
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')

pipeline(
  fs.createReadStream('object.json').setEncoding('utf8'),
  new Transform({
    decodeStrings: false, // バッファではなく文字列入力を許可
    construct(callback) {
      this.data = ''
      callback()
    },
    transform(chunk, encoding, callback) {
      this.data += chunk
      callback()
    },
    flush(callback) {
      try {
        // 有効なJSONであることを確認します。
        JSON.parse(this.data)
        this.push(this.data)
        callback()
      } catch (err) {
        callback(err)
      }
    },
  }),
  fs.createWriteStream('valid-object.json'),
  err => {
    if (err) {
      console.error('failed', err)
    } else {
      console.log('completed')
    }
  }
)

デュプレックスストリームの例

以下は、データの書き込みが可能で、データの読み込みも可能な仮の低レベルソースオブジェクトをラップするDuplexストリームの簡単な例を示しています。ただし、Node.js ストリームと互換性のない API を使用しています。以下は、Writableインターフェースを介して受信した書き込みデータをバッファリングし、Readableインターフェースを介して読み戻すDuplexストリームの簡単な例を示しています。

js
const { Duplex } = require('node:stream')
const kSource = Symbol('source')

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options)
    this[kSource] = source
  }

  _write(chunk, encoding, callback) {
    // 基になるソースは文字列のみを処理します。
    if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
    this[kSource].writeSomeData(chunk)
    callback()
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding))
    })
  }
}

Duplexストリームの最も重要な側面は、Readable側とWritable側が、単一のオブジェクトインスタンス内に共存しているにもかかわらず、互いに独立して動作することです。

オブジェクトモードのデュプレックスストリーム

Duplexストリームの場合、objectModeは、それぞれreadableObjectModeおよびwritableObjectModeオプションを使用して、Readable側またはWritable側のどちらか一方にのみ排他的に設定できます。

たとえば、次の例では、オブジェクトモードのWritable側を持ち、JavaScript の数値を受け取り、Readable側で 16 進数文字列に変換する新しいTransformストリーム(Duplexストリームの一種)が作成されます。

js
const { Transform } = require('node:stream')

// すべてのTransformストリームはDuplexストリームでもあります。
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // 必要に応じてチャンクを数値に変換します。
    chunk |= 0

    // チャンクを他のものに変換します。
    const data = chunk.toString(16)

    // データを読み取り可能なキューにプッシュします。
    callback(null, '0'.repeat(data.length % 2) + data)
  },
})

myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))

myTransform.write(1)
// Prints: 01
myTransform.write(10)
// Prints: 0a
myTransform.write(100)
// Prints: 64

トランスフォームストリームの実装

Transform ストリームは、入力を基にして出力が計算されるDuplex ストリームです。例としては、データを圧縮、暗号化、または復号化するzlib ストリームやcrypto ストリームなどがあります。

出力が入力と同じサイズ、同じチャンク数、または同時に到着する必要はありません。たとえば、Hash ストリームは、入力の終了時に提供される単一の出力チャンクしか持ちません。zlib ストリームは、入力よりもはるかに小さく、またははるかに大きい出力を生成します。

stream.Transform クラスを拡張して、Transform ストリームを実装します。

stream.Transform クラスはプロトタイプとして stream.Duplex から継承し、独自の writable._write() メソッドとreadable._read() メソッドを実装します。カスタム Transform 実装では、transform._transform() メソッドを実装する必要があり、transform._flush() メソッドも実装できます。

Transform ストリームを使用する際には、ストリームに書き込まれたデータによって、Readable側の出力が消費されない場合、ストリームのWritable側が一時停止される可能性があることに注意が必要です。

new stream.Transform([options])

js
const { Transform } = require('node:stream')

class MyTransform extends Transform {
  constructor(options) {
    super(options)
    // ...
  }
}

または、ES6 以前のスタイルのコンストラクタを使用する場合:

js
const { Transform } = require('node:stream')
const util = require('node:util')

function MyTransform(options) {
  if (!(this instanceof MyTransform)) return new MyTransform(options)
  Transform.call(this, options)
}
util.inherits(MyTransform, Transform)

または、簡略化されたコンストラクタアプローチを使用する場合:

js
const { Transform } = require('node:stream')

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  },
})

イベント: 'end'

'end' イベントは stream.Readable クラスからのものです。'end' イベントは、すべてのデータが出力された後、transform._flush() のコールバックが呼び出された後に発生します。エラーが発生した場合、'end' は発生しません。

イベント: 'finish'

'finish' イベントは stream.Writable クラスからのものです。'finish' イベントは、stream.end() が呼び出され、すべてのチャンクが stream._transform() によって処理された後に発生します。エラーが発生した場合、'finish' は発生しません。

transform._flush(callback)

  • callback <Function> 残りデータがフラッシュされたときに呼び出されるコールバック関数(オプションでエラー引数とデータを含む)。

この関数は、アプリケーションコードによって直接呼び出してはなりません。子クラスで実装する必要があり、内部の Readable クラスメソッドによってのみ呼び出されるべきです。

場合によっては、変換操作でストリームの最後に追加のデータを出力する必要がある場合があります。たとえば、zlib 圧縮ストリームは、出力を最適に圧縮するために使用される内部状態の量を格納します。しかし、ストリームが終了すると、圧縮データが完全になるように、その追加データをフラッシュする必要があります。

カスタムの Transform 実装では、transform._flush() メソッドを実装することができます。これは、消費される書き込みデータがなくなった後、'end' イベントが Readable ストリームの終了を示すシグナルを送信する前に呼び出されます。

transform._flush() の実装内では、必要に応じて transform.push() メソッドを 0 回以上呼び出すことができます。フラッシュ操作が完了したときに、callback 関数を呼び出す必要があります。

transform._flush() メソッドにはアンダースコアが接頭辞として付けられているのは、それを定義するクラスに対して内部的なものであり、ユーザープログラムによって直接呼び出すべきではないためです。

transform._transform(chunk, encoding, callback)

  • chunk <Buffer> | <string> | <any> stream.write()に渡されたstringから変換された、変換されるBuffer。ストリームのdecodeStringsオプションがfalseであるか、ストリームがオブジェクトモードで動作している場合、チャンクは変換されず、stream.write()に渡されたものになります。
  • encoding <string> チャンクが文字列の場合、エンコーディングの種類です。チャンクがバッファの場合、特別な値'buffer'です。その場合は無視してください。
  • callback <Function> 指定されたchunkが処理された後に呼び出されるコールバック関数(エラー引数とデータを含む場合もあります)。

この関数は、アプリケーションコードから直接呼び出してはいけません。子クラスで実装し、内部のReadableクラスメソッドからのみ呼び出す必要があります。

すべてのTransformストリーム実装は、入力を受け入れて出力を生成する_transform()メソッドを提供する必要があります。transform._transform()実装は書き込まれているバイトを処理し、出力を計算してから、transform.push()メソッドを使用して読み取り可能な部分に出力を渡します。

transform.push()メソッドは、チャンクの結果としてどれだけ出力が生成されるかに応じて、単一入力チャンクから出力を生成するために 0 回以上呼び出すことができます。

入力データの特定のチャンクから出力が生成されない可能性があります。

callback関数は、現在のチャンクが完全に消費された場合にのみ呼び出す必要があります。callbackに渡される最初の引数は、入力の処理中にエラーが発生した場合はErrorオブジェクト、そうでない場合はnullでなければなりません。callbackに 2 番目の引数が渡された場合、最初の引数が偽の場合にのみ、transform.push()メソッドに転送されます。言い換えると、次のものは同等です。

js
transform.prototype._transform = function (data, encoding, callback) {
  this.push(data)
  callback()
}

transform.prototype._transform = function (data, encoding, callback) {
  callback(null, data)
}

transform._transform()メソッドは、それを定義するクラスの内部であり、ユーザープログラムから直接呼び出してはならないため、アンダースコアで接頭辞が付けられています。

transform._transform()は決して並列に呼び出されません。ストリームはキューメカニズムを実装しており、次のチャンクを受け取るには、callbackを同期的にまたは非同期的に呼び出す必要があります。

クラス: stream.PassThrough

stream.PassThroughクラスは、入力バイトを単に出力側に渡すだけの、Transformストリームの些細な実装です。その目的は主に例とテストのためですが、stream.PassThroughを新しい種類のストリームの構成要素として役立てるユースケースもいくつかあります。

付加情報

ストリームと非同期ジェネレータおよび非同期イテレータの互換性

JavaScript における非同期ジェネレータとイテレータのサポートにより、非同期ジェネレータは事実上、この時点でファーストクラスの言語レベルのストリーム構成要素となっています。

Node.js ストリームと非同期ジェネレータおよび非同期イテレータを使用する一般的な相互運用事例を以下に示します。

非同期イテレータによる読み込み可能ストリームの消費

js
;(async function () {
  for await (const chunk of readable) {
    console.log(chunk)
  }
})()

非同期イテレータは、未処理の破壊後のエラーを防ぐために、ストリームに永続的なエラーハンドラを登録します。

非同期ジェネレータによる読み込み可能ストリームの作成

Readable.from()ユーティリティメソッドを使用して、非同期ジェネレータから Node.js 読み込み可能ストリームを作成できます。

js
const { Readable } = require('node:stream')

const ac = new AbortController()
const signal = ac.signal

async function* generate() {
  yield 'a'
  await someLongRunningFn({ signal })
  yield 'b'
  yield 'c'
}

const readable = Readable.from(generate())
readable.on('close', () => {
  ac.abort()
})

readable.on('data', chunk => {
  console.log(chunk)
})

非同期イテレータからの書き込み可能ストリームへのパイプ

非同期イテレータから書き込み可能ストリームに書き込む場合は、バックプレッシャーとエラーを正しく処理する必要があります。stream.pipeline()は、バックプレッシャーとバックプレッシャー関連のエラーの処理を抽象化します。

js
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')

const writable = fs.createWriteStream('./file')

const ac = new AbortController()
const signal = ac.signal

const iterator = createIterator({ signal })

// コールバックパターン
pipeline(iterator, writable, (err, value) => {
  if (err) {
    console.error(err)
  } else {
    console.log(value, 'value returned')
  }
}).on('close', () => {
  ac.abort()
})

// Promiseパターン
pipelinePromise(iterator, writable)
  .then(value => {
    console.log(value, 'value returned')
  })
  .catch(err => {
    console.error(err)
    ac.abort()
  })

以前の Node.js バージョンとの互換性

Node.js 0.10 より前のバージョンでは、Readableストリームインターフェースはよりシンプルでしたが、機能も使い勝手も劣っていました。

  • stream.read()メソッドへの呼び出しを待つのではなく、'data'イベントはすぐに発生し始めます。データの処理方法を決定するために何らかの作業を行う必要があるアプリケーションでは、データが失われないように読み取ったデータをバッファに保存する必要がありました。
  • stream.pause()メソッドは、保証されたものではなく、助言的なものでした。これは、ストリームが一時停止状態になっている場合でも、'data'イベントを受け取る準備をする必要があることを意味します。

Node.js 0.10 では、Readableクラスが追加されました。以前の Node.js プログラムとの下位互換性のために、Readableストリームは、'data'イベントハンドラが追加された場合、またはstream.resume()メソッドが呼び出された場合に、「フローモード」に切り替わります。その結果、新しいstream.read()メソッドと'readable'イベントを使用していなくても、'data'チャンクが失われる心配がなくなります。

ほとんどのアプリケーションは正常に動作し続けますが、次の条件下ではエッジケースが発生します。

  • 'data'イベントリスナーが追加されていない。
  • stream.resume()メソッドが呼び出されていない。
  • ストリームが書き込み可能な宛先にパイプされていない。

例えば、以下のコードを考えてみましょう。

js
// WARNING!  BROKEN!
net
  .createServer(socket => {
    // We add an 'end' listener, but never consume the data.
    socket.on('end', () => {
      // It will never get here.
      socket.end('The message was received but was not processed.\n')
    })
  })
  .listen(1337)

Node.js 0.10 より前では、受信したメッセージデータは単に破棄されていました。しかし、Node.js 0.10 以降では、ソケットは永遠に一時停止状態のままになります。

この状況での回避策は、stream.resume()メソッドを呼び出してデータのフローを開始することです。

js
// Workaround.
net
  .createServer(socket => {
    socket.on('end', () => {
      socket.end('The message was received but was not processed.\n')
    })

    // Start the flow of data, discarding it.
    socket.resume()
  })
  .listen(1337)

新しいReadableストリームがフローモードに切り替わることに加えて、0.10 以前のスタイルのストリームは、readable.wrap()メソッドを使用してReadableクラスにラップできます。

readable.read(0)

実際にはデータを使用せずに、基礎となる読み込み可能ストリームメカニズムの更新をトリガーする必要がある場合があります。そのような場合、readable.read(0) を呼び出すことができます。これは常に null を返します。

内部読み込みバッファが highWaterMark より小さく、ストリームが現在読み込み中ではない場合、stream.read(0) を呼び出すと、低レベルの stream._read() 呼び出しがトリガーされます。

ほとんどのアプリケーションではこれを行う必要はほとんどありませんが、特に Readable ストリームクラスの内部で、Node.js 内でこれを行う状況があります。

readable.push('')

readable.push('') の使用はお勧めしません。

オブジェクトモードではないストリームに、ゼロバイトの <string><Buffer><TypedArray>、または<DataView> をプッシュすると、興味深い副作用があります。readable.push() への呼び出しであるため、読み込みプロセスが終了します。ただし、引数が空文字列であるため、読み込み可能バッファにデータが追加されないため、ユーザーが使用できるデータはありません。

readable.setEncoding() 呼び出し後の highWaterMark の不一致

readable.setEncoding() の使用は、オブジェクトモードではない場合の highWaterMark の動作を変更します。

通常、現在のバッファのサイズはバイトhighWaterMark と比較されます。しかし、setEncoding() が呼び出された後、比較関数はバッファのサイズを文字で測定し始めます。

latin1ascii を使用する一般的なケースでは問題ありません。しかし、マルチバイト文字を含む可能性のある文字列を扱う場合は、この動作に注意することをお勧めします。