Skip to content

[稳定: 2 - 稳定]

稳定: 2 稳定性: 2 - 稳定

源代码: lib/stream.js

流是用于在 Node.js 中处理流式数据的抽象接口。node:stream 模块提供了一个实现流接口的 API。

Node.js 提供了许多流对象。例如,对 HTTP 服务器的请求process.stdout 都是流实例。

流可以是可读的、可写的,或者两者都是。所有流都是 EventEmitter 的实例。

要访问 node:stream 模块:

js
const stream = require('node:stream')

node:stream 模块用于创建新的流实例类型。通常不需要使用 node:stream 模块来使用流。

本文档的组织结构

本文档包含两个主要部分和一个用于备注的第三部分。第一部分解释如何在应用程序中使用现有流。第二部分解释如何创建新的流类型。

流的类型

Node.js 中有四种基本的流类型:

此外,此模块还包括实用程序函数 stream.duplexPair()stream.pipeline()stream.finished()stream.Readable.from()stream.addAbortSignal()

流 Promise API

新增于:v15.0.0

stream/promises API 提供了一套替代的异步实用函数,用于返回 Promise 对象而不是使用回调的流。可以通过 require('node:stream/promises')require('node:stream').promises 访问该 API。

stream.pipeline(source[, ...transforms], destination[, options])

stream.pipeline(streams[, options])

[历史]

版本变更
v18.0.0, v17.2.0, v16.14.0添加了 end 选项,可以将其设置为 false 以防止在源结束时自动关闭目标流。
v15.0.0新增于:v15.0.0
js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
  console.log('Pipeline succeeded.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'

await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('Pipeline succeeded.')

要使用 AbortSignal,请将其作为最后一个参数放在选项对象中。当信号中断时,将使用 AbortError 调用底层管道的 destroy 方法。

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  const ac = new AbortController()
  const signal = ac.signal

  setImmediate(() => ac.abort())
  await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
    signal,
  })
}

run().catch(console.error) // AbortError
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'

const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
  await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
  console.error(err) // AbortError
}

pipeline API 也支持异步生成器:

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')

async function run() {
  await pipeline(
    fs.createReadStream('lowercase.txt'),
    async function* (source, { signal }) {
      source.setEncoding('utf8') // 使用字符串而不是 `Buffer`。
      for await (const chunk of source) {
        yield await processChunk(chunk, { signal })
      }
    },
    fs.createWriteStream('uppercase.txt')
  )
  console.log('Pipeline succeeded.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'

await pipeline(
  createReadStream('lowercase.txt'),
  async function* (source, { signal }) {
    source.setEncoding('utf8') // 使用字符串而不是 `Buffer`。
    for await (const chunk of source) {
      yield await processChunk(chunk, { signal })
    }
  },
  createWriteStream('uppercase.txt')
)
console.log('Pipeline succeeded.')

请记住处理传递给异步生成器的 signal 参数。尤其是在异步生成器是管道源(即第一个参数)的情况下,否则管道将永远不会完成。

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')

async function run() {
  await pipeline(async function* ({ signal }) {
    await someLongRunningfn({ signal })
    yield 'asd'
  }, fs.createWriteStream('uppercase.txt'))
  console.log('Pipeline succeeded.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
  await someLongRunningfn({ signal })
  yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline succeeded.')

pipeline API 提供了 回调版本

stream.finished(stream[, options])

[历史]

版本变更
v19.5.0, v18.14.0添加了对 ReadableStreamWritableStream 的支持。
v19.1.0, v18.13.0添加了 cleanup 选项。
v15.0.0首次引入: v15.0.0
js
const { finished } = require('node:stream/promises')
const fs = require('node:fs')

const rs = fs.createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('Stream 完成读取。')
}

run().catch(console.error)
rs.resume() // 耗尽流。
js
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'

const rs = createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('Stream 完成读取。')
}

run().catch(console.error)
rs.resume() // 耗尽流。

finished API 还提供了一个 回调版本

stream.finished() 在返回的 promise 解析或拒绝后会留下悬空事件监听器(特别是 'error''end''finish''close')。这样做的原因是为了防止意外的 'error' 事件(由于不正确的流实现)导致意外崩溃。如果这是不需要的行为,则应将 options.cleanup 设置为 true

js
await finished(rs, { cleanup: true })

对象模式

所有由 Node.js API 创建的流都只操作字符串,<Buffer><TypedArray><DataView> 对象:

  • 字符串Buffer 是与流一起使用的最常见类型。
  • TypedArrayDataView 允许您使用 Int32ArrayUint8Array 等类型处理二进制数据。当您将 TypedArray 或 DataView 写入流时,Node.js 会处理原始字节。

但是,流实现可以使用其他类型的 JavaScript 值(null 除外,它在流中具有特殊用途)。此类流被认为是在“对象模式”下运行。

在创建流时,使用 objectMode 选项可以将流实例切换到对象模式。尝试将现有流切换到对象模式是不安全的。

缓冲

WritableReadable 流都会将数据存储在内部缓冲区中。

潜在的缓冲数据量取决于传递给流构造函数的 highWaterMark 选项。对于普通流,highWaterMark 选项指定总字节数。对于以对象模式运行的流,highWaterMark 指定对象的总数。对于处理(但不解码)字符串的流,highWaterMark 指定 UTF-16 代码单元的总数。

当实现调用 stream.push(chunk) 时,数据会在 Readable 流中缓冲。如果流的使用者没有调用 stream.read(),则数据将保留在内部队列中,直到被使用。

一旦内部读取缓冲区的总大小达到 highWaterMark 指定的阈值,流将暂时停止从底层资源读取数据,直到当前缓冲的数据被使用(也就是说,流将停止调用用于填充读取缓冲区的内部 readable._read() 方法)。

当重复调用 writable.write(chunk) 方法时,数据会在 Writable 流中缓冲。当内部写入缓冲区的总大小低于 highWaterMark 设置的阈值时,对 writable.write() 的调用将返回 true。一旦内部缓冲区的大小达到或超过 highWaterMark,则返回 false

stream API,特别是 stream.pipe() 方法的一个主要目标是将数据的缓冲限制在可接受的水平,这样速度不同的源和目标就不会压垮可用内存。

highWaterMark 选项是一个阈值,而不是限制:它决定了流在停止请求更多数据之前缓冲的数据量。它通常不强制执行严格的内存限制。特定的流实现可以选择强制执行更严格的限制,但这只是可选的。

因为 DuplexTransform 流同时是 ReadableWritable,所以每个流维护两个独立的内部缓冲区用于读取和写入,允许每一侧独立于另一侧操作,同时保持适当和高效的数据流。例如,net.Socket 实例是 Duplex 流,其 Readable 端允许使用从套接字接收的数据,其 Writable 端允许向套接字写入数据。因为数据可能以比接收数据快或慢的速度写入套接字,所以每一侧都应该独立于另一侧操作(和缓冲)。

内部缓冲的机制是内部实现细节,可能会随时更改。但是,对于某些高级实现,可以使用 writable.writableBufferreadable.readableBuffer 获取内部缓冲区。不建议使用这些未公开的属性。

流消费者 API

几乎所有 Node.js 应用程序,无论多么简单,都以某种方式使用流。以下是在 Node.js 应用程序中使用流的示例,该应用程序实现了一个 HTTP 服务器:

js
const http = require('node:http')

const server = http.createServer((req, res) => {
  // `req` 是一个 http.IncomingMessage,它是一个可读流。
  // `res` 是一个 http.ServerResponse,它是一个可写流。

  let body = ''
  // 获取数据作为 utf8 字符串。
  // 如果未设置编码,则将接收 Buffer 对象。
  req.setEncoding('utf8')

  // 可读流在添加侦听器后会发出 'data' 事件。
  req.on('data', chunk => {
    body += chunk
  })

  // 'end' 事件表示已接收整个主体。
  req.on('end', () => {
    try {
      const data = JSON.parse(body)
      // 向用户写回一些有趣的内容:
      res.write(typeof data)
      res.end()
    } catch (er) {
      // 糟糕!无效的 JSON!
      res.statusCode = 400
      return res.end(`error: ${er.message}`)
    }
  })
})

server.listen(1337)

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON

Writable 流(例如示例中的 res)公开诸如 write()end() 之类的方法,这些方法用于将数据写入流。

Readable 流使用 EventEmitter API 来通知应用程序代码何时可以从流中读取数据。可以使用多种方法从流中读取可用数据。

WritableReadable 流都使用 EventEmitter API 以各种方式来传达流的当前状态。

DuplexTransform 流同时是 WritableReadable

写入数据到流或从流中使用数据的应用程序不需要直接实现流接口,并且通常没有理由调用 require('node:stream')

希望实现新型流的开发者应该参考流实现者 API 部分。

可写流

可写流是对数据的目标写入位置的抽象。

Writable 流的示例包括:

其中一些示例实际上是实现了 Writable 接口的 Duplex 流。

所有 Writable 流都实现了 stream.Writable 类定义的接口。

虽然 Writable 流的具体实例可能在各个方面有所不同,但所有 Writable 流都遵循与以下示例中所示相同的根本使用模式:

js
const myStream = getWritableStreamSomehow()
myStream.write('some data')
myStream.write('some more data')
myStream.end('done writing data')

类: stream.Writable

添加到: v0.9.4

事件: 'close'

[历史]

版本更改
v10.0.0添加 emitClose 选项以指定是否在销毁时发出 'close'
v0.9.4添加到: v0.9.4

当流及其任何底层资源(例如文件描述符)关闭时,将发出 'close' 事件。该事件表示不会再发出任何事件,也不会发生进一步的计算。

如果 Writable 流使用 emitClose 选项创建,它将始终发出 'close' 事件。

事件: 'drain'

添加到: v0.9.4

如果对 stream.write(chunk) 的调用返回 false,则当适合恢复向流写入数据时,将发出 'drain' 事件。

js
// 将数据写入提供的可写流一百万次。
// 注意背压。
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000
  write()
  function write() {
    let ok = true
    do {
      i--
      if (i === 0) {
        // 最后一次!
        writer.write(data, encoding, callback)
      } else {
        // 查看我们是否应该继续,或者等待。
        // 不要传递回调,因为我们还没有完成。
        ok = writer.write(data, encoding)
      }
    } while (i > 0 && ok)
    if (i > 0) {
      // 必须提前停止!
      // 一旦它排空,就再写一些。
      writer.once('drain', write)
    }
  }
}
事件: 'error'

添加到: v0.9.4

如果在写入或管道传输数据时发生错误,则会发出 'error' 事件。当被调用时,监听器回调将传递一个单一的 Error 参数。

除非在创建流时将 autoDestroy 选项设置为 false,否则在发出 'error' 事件时流将关闭。

'error' 之后,不应发出除 'close' 之外的其他事件(包括 'error' 事件)。

事件: 'finish'

添加到: v0.9.4

在调用 stream.end() 方法并将所有数据刷新到底层系统之后,将发出 'finish' 事件。

js
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`)
}
writer.on('finish', () => {
  console.log('所有写入现在都已完成。')
})
writer.end('这是结尾\n')
事件: 'pipe'

添加到: v0.9.4

当在可读流上调用 stream.pipe() 方法时,会发出 'pipe' 事件,将此可写流添加到其目标集中。

js
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
  console.log('某些东西正在管道传输到写入器中。')
  assert.equal(src, reader)
})
reader.pipe(writer)
事件: 'unpipe'

添加到: v0.9.4

当在 Readable 流上调用 stream.unpipe() 方法时,会发出 'unpipe' 事件,将其从其目标集中移除此 Writable

如果此 Writable 流在 Readable 流管道传输到它时发出错误,也会发出此事件。

js
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
  console.log('某些东西已停止管道传输到写入器中。')
  assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()

添加到: v0.11.2

writable.cork() 方法强制将所有写入的数据缓冲在内存中。缓冲的数据将在调用 stream.uncork()stream.end() 方法时刷新。

writable.cork() 的主要目的是适应这种情况:几个小的块快速连续地写入流。writable.cork() 不会立即将它们转发到底层目标,而是缓冲所有块,直到调用 writable.uncork(),这将把它们全部传递给 writable._writev()(如果存在)。这可以防止头部阻塞的情况,在这种情况下,数据在等待处理第一个小块时正在被缓冲。但是,如果不实现 writable._writev() 就使用 writable.cork() 可能会对吞吐量产生不利影响。

另请参阅:writable.uncork()writable._writev()

writable.destroy([error])

[历史]

版本更改
v14.0.0在已销毁的流上作为无操作工作。
v8.0.0添加到: v8.0.0
  • error <Error> 可选,一个要与 'error' 事件一起发出的错误。
  • 返回: <this>

销毁流。可选地发出 'error' 事件,并发出 'close' 事件(除非 emitClose 设置为 false)。在此调用之后,可写流已结束,后续对 write()end() 的调用将导致 ERR_STREAM_DESTROYED 错误。这是一种破坏性和立即销毁流的方式。之前对 write() 的调用可能尚未排空,并可能触发 ERR_STREAM_DESTROYED 错误。如果数据应在关闭之前刷新,请使用 end() 代替 destroy,或者在销毁流之前等待 'drain' 事件。

js
const { Writable } = require('node:stream')

const myStream = new Writable()

const fooErr = new Error('foo error')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo error
js
const { Writable } = require('node:stream')

const myStream = new Writable()

myStream.destroy()
myStream.on('error', function wontHappen() {})
js
const { Writable } = require('node:stream')

const myStream = new Writable()
myStream.destroy()

myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED

一旦调用了 destroy(),任何进一步的调用都将是无操作的,并且除了来自 _destroy() 的错误之外,不会发出任何其他错误作为 'error'

实现者不应覆盖此方法,而应实现 writable._destroy()

writable.closed

添加到: v18.0.0

在发出 'close' 后为 true

writable.destroyed

添加到: v8.0.0

在调用 writable.destroy() 后为 true

js
const { Writable } = require('node:stream')

const myStream = new Writable()

console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])

[历史]

版本更改
v22.0.0, v20.13.0chunk 参数现在可以是 TypedArrayDataView 实例。
v15.0.0callback 在 'finish' 之前或发生错误时被调用。
v14.0.0如果发出 'finish' 或 'error',则调用 callback
v10.0.0此方法现在返回对 writable 的引用。
v8.0.0chunk 参数现在可以是 Uint8Array 实例。
v0.9.4添加到: v0.9.4

调用 writable.end() 方法表示不会再向 Writable 写入数据。可选的 chunkencoding 参数允许在关闭流之前立即写入最后一部分附加数据。

在调用 stream.end() 之后调用 stream.write() 方法将引发错误。

js
// 写入 'hello, ' 然后以 'world!' 结尾。
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hello, ')
file.end('world!')
// 现在不允许再写入!
writable.setDefaultEncoding(encoding)

[历史]

版本更改
v6.1.0此方法现在返回对 writable 的引用。
v0.11.15添加到: v0.11.15

writable.setDefaultEncoding() 方法设置 Writable 流的默认 encoding

writable.uncork()

添加到: v0.11.2

writable.uncork() 方法刷新自调用 stream.cork() 以来缓冲的所有数据。

当使用 writable.cork()writable.uncork() 来管理对流的写入缓冲时,使用 process.nextTick() 推迟对 writable.uncork() 的调用。这样做允许在给定的 Node.js 事件循环阶段对所有 writable.write() 调用进行批处理。

js
stream.cork()
stream.write('some ')
stream.write('data ')
process.nextTick(() => stream.uncork())

如果在流上多次调用 writable.cork() 方法,则必须调用相同数量的 writable.uncork() 调用才能刷新缓冲的数据。

js
stream.cork()
stream.write('some ')
stream.cork()
stream.write('data ')
process.nextTick(() => {
  stream.uncork()
  // 数据在第二次调用 uncork() 之前不会被刷新。
  stream.uncork()
})

另请参阅:writable.cork()

writable.writable

添加到: v11.4.0

如果可以安全地调用 writable.write(),则为 true,这意味着流尚未被销毁、出错或结束。

writable.writableAborted

添加到: v18.0.0, v16.17.0

[稳定: 1 - 实验性]

稳定: 1 稳定性: 1 - 实验性

返回在发出 'finish' 之前流是否被销毁或出错。

writable.writableEnded

添加到: v12.9.0

在调用 writable.end() 之后为 true。此属性并不表示数据是否已被刷新,为此请使用 writable.writableFinished

writable.writableCorked

添加到: v13.2.0, v12.16.0

需要调用 writable.uncork() 的次数才能完全取消流的 cork。

writable.errored

添加到: v18.0.0

如果流已使用错误被销毁,则返回错误。

writable.writableFinished

添加到: v12.6.0

在发出 'finish' 事件之前立即设置为 true

writable.writableHighWaterMark

添加到: v9.3.0

返回创建此 Writable 时传递的 highWaterMark 的值。

writable.writableLength

添加到: v9.4.0

此属性包含队列中准备写入的字节(或对象)数。该值提供有关 highWaterMark 状态的内省数据。

writable.writableNeedDrain

添加到: v15.2.0, v14.17.0

如果流的缓冲区已满并且流将发出 'drain',则为 true

writable.writableObjectMode

添加到: v12.3.0

给定 Writable 流的属性 objectMode 的 Getter。

writable[Symbol.asyncDispose]()

添加到: v22.4.0, v20.16.0

[稳定: 1 - 实验性]

稳定: 1 稳定性: 1 - 实验性

使用 AbortError 调用 writable.destroy(),并返回一个在流完成时完成的 promise。

writable.write(chunk[, encoding][, callback])

[历史]

版本更改
v22.0.0, v20.13.0chunk 参数现在可以是 TypedArrayDataView 实例。
v8.0.0chunk 参数现在可以是 Uint8Array 实例。
v6.0.0现在将 null 作为 chunk 参数传递始终被认为无效,即使在对象模式下也是如此。
v0.9.4添加到: v0.9.4

writable.write() 方法将一些数据写入流,并在数据完全处理后调用提供的 callback。如果发生错误,则 callback 将以错误作为其第一个参数被调用。callback 是异步调用的,并且在发出 'error' 之前调用。

如果在接纳 chunk 后内部缓冲区小于创建流时配置的 highWaterMark,则返回值为 true。如果返回 false,则应停止进一步尝试向流写入数据,直到发出 'drain' 事件。如果 write() 返回 false,请不要在发出 'drain' 事件之前写入更多块。虽然允许在流未排空时调用 write(),但 Node.js 将缓冲所有写入的块,直到达到最大内存使用量,此时它将无条件中止。即使在它中止之前,高内存使用率也会导致垃圾收集器性能下降和高 RSS(即使不再需要内存,通常也不会释放回系统)。由于如果远程对等方没有读取数据,TCP 套接字可能永远不会排空,因此写入未排空的套接字可能会导致可远程利用的漏洞。

在流未排空时写入数据对于 Transform 来说尤其成问题,因为 Transform 流默认情况下会暂停,直到它们被管道传输或添加 'data''readable' 事件处理程序。

如果要写入的数据可以按需生成或获取,建议将其逻辑封装到 Readable 中并使用 stream.pipe()。但是,如果更喜欢调用 write(),则可以使用 'drain' 事件来尊重背压并避免内存问题:

js
function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb)
  } else {
    process.nextTick(cb)
  }
}

// 等待 cb 被调用后再进行任何其他写入。
write('hello', () => {
  console.log('写入已完成,现在可以进行更多写入。')
})

处于对象模式的可写流将始终忽略 encoding 参数。

可读流

可读流是对数据的来源的抽象,从中可以消费数据。

Readable 流的示例包括:

所有 Readable 流都实现了 stream.Readable 类定义的接口。

两种读取模式

Readable 流实际上运行在两种模式之一:流式和暂停。这些模式与对象模式不同。Readable 流可以处于对象模式或非对象模式,而不管它是否处于流式模式或暂停模式。

  • 在流式模式下,数据会自动从底层系统读取,并尽可能快地通过 EventEmitter 接口的事件提供给应用程序。
  • 在暂停模式下,必须显式调用 stream.read() 方法从流中读取数据块。

所有 Readable 流都以暂停模式开始,但可以通过以下几种方式切换到流式模式:

Readable 可以使用以下方法之一切换回暂停模式:

  • 如果没有管道目标,则调用 stream.pause() 方法。
  • 如果有管道目标,则移除所有管道目标。可以通过调用 stream.unpipe() 方法移除多个管道目标。

需要记住的重要概念是,Readable 不会生成数据,除非提供了一种消费或忽略该数据的机制。如果消费机制被禁用或移除,Readable尝试停止生成数据。

出于向后兼容性的原因,移除 'data' 事件处理程序不会自动暂停流。此外,如果存在管道目标,则调用 stream.pause() 并不能保证流在这些目标清空并请求更多数据后会保持暂停状态。

如果将 Readable 切换到流式模式并且没有可用的使用者来处理数据,则该数据将丢失。例如,当调用 readable.resume() 方法时没有附加到 'data' 事件的侦听器,或者当从流中移除 'data' 事件处理程序时,就会发生这种情况。

添加 'readable' 事件处理程序会自动使流停止流动,并且必须通过 readable.read() 来使用数据。如果移除 'readable' 事件处理程序,则如果存在 'data' 事件处理程序,流将再次开始流动。

三种状态

Readable 流的“两种模式”是对 Readable 流内部实现中更复杂的内部状态管理的简化抽象。

具体来说,在任何给定时间点,每个 Readable 都处于三种可能状态之一:

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

readable.readableFlowingnull 时,没有提供消耗流数据的机制。因此,流不会生成数据。在此状态下,附加 'data' 事件的监听器,调用 readable.pipe() 方法或调用 readable.resume() 方法将把 readable.readableFlowing 切换为 true,导致 Readable 开始主动发出数据生成的事件。

调用 readable.pause()readable.unpipe() 或接收背压将导致 readable.readableFlowing 设置为 false,暂时停止事件流,但不会停止数据生成。在此状态下,附加 'data' 事件的监听器不会将 readable.readableFlowing 切换为 true

js
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()

pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing 现在为 false。

pass.on('data', chunk => {
  console.log(chunk.toString())
})
// readableFlowing 仍然为 false。
pass.write('ok') // 不会发出 'data' 事件。
pass.resume() // 必须调用此方法才能使流发出 'data' 事件。
// readableFlowing 现在为 true。

readable.readableFlowingfalse 时,数据可能会累积在流的内部缓冲区中。

选择一种 API 风格

Readable 流 API 在多个 Node.js 版本中不断发展,并提供多种方法来消费流数据。一般来说,开发者应该选择 一种 数据消费方法,并且绝不应该使用多种方法来从单个流中消费数据。具体来说,结合使用 on('data')on('readable')pipe() 或异步迭代器可能会导致难以预料的行为。

类: stream.Readable

添加到:v0.9.4

事件: 'close'

[历史]

版本更改
v10.0.0添加 emitClose 选项以指定是否在销毁时发出 'close'
v0.9.4添加到:v0.9.4

当流及其任何底层资源(例如文件描述符)关闭时,将发出 'close' 事件。该事件表示不会再发出任何事件,也不会发生进一步的计算。

如果使用 emitClose 选项创建 Readable 流,则该流将始终发出 'close' 事件。

事件: 'data'

添加到:v0.9.4

  • chunk <Buffer> | <string> | <any> 数据块。对于非对象模式操作的流,数据块将是字符串或 Buffer。对于处于对象模式的流,数据块可以是任何 JavaScript 值,但 null 除外。

每当流将数据块的所有权交给消费者时,都会发出 'data' 事件。这可能发生在通过调用 readable.pipe()readable.resume() 或将监听器回调附加到 'data' 事件时,流以流式模式切换时。每当调用 readable.read() 方法并且有可返回的数据块时,也会发出 'data' 事件。

'data' 事件监听器附加到尚未显式暂停的流将使流进入流式模式。然后,数据将在可用时立即传递。

如果使用 readable.setEncoding() 方法为流指定了默认编码,则监听器回调将把数据块作为字符串传递;否则,数据将作为 Buffer 传递。

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Received ${chunk.length} bytes of data.`)
})
事件: 'end'

添加到:v0.9.4

当流中没有更多数据可供使用时,将发出 'end' 事件。

除非数据完全被使用,否则不会发出 'end' 事件。这可以通过将流切换到流式模式,或者重复调用 stream.read() 直到所有数据都被使用来实现。

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Received ${chunk.length} bytes of data.`)
})
readable.on('end', () => {
  console.log('There will be no more data.')
})
事件: 'error'

添加到:v0.9.4

Readable 实现随时可能发出 'error' 事件。通常,如果底层流由于底层内部故障而无法生成数据,或者当流实现尝试推送无效的数据块时,可能会发生这种情况。

监听器回调将传递单个 Error 对象。

事件: 'pause'

添加到:v0.9.4

当调用 stream.pause()readableFlowing 不为 false 时,将发出 'pause' 事件。

事件: 'readable'

[历史]

版本更改
v10.0.0'readable' 始终在调用 .push() 后的下一个 tick 中发出。
v10.0.0使用 'readable' 需要调用 .read()
v0.9.4添加到:v0.9.4

当有数据可供从流中读取时,最多可达配置的高水位线 (state.highWaterMark),将发出 'readable' 事件。实际上,它表示流在缓冲区中具有新信息。如果缓冲区中有可用数据,则可以调用 stream.read() 来检索该数据。此外,当流的末尾到达时,也可能会发出 'readable' 事件。

js
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
  // There is some data to read now.
  let data

  while ((data = this.read()) !== null) {
    console.log(data)
  }
})

如果流的末尾已到达,则调用 stream.read() 将返回 null 并触发 'end' 事件。如果从未有任何数据可供读取,这也是正确的。例如,在以下示例中,foo.txt 是一个空文件:

js
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
  console.log('end')
})

运行此脚本的输出为:

bash
$ node test.js
readable: null
end

在某些情况下,附加 'readable' 事件的监听器将导致将一定数量的数据读取到内部缓冲区中。

一般来说,readable.pipe()'data' 事件机制比 'readable' 事件更容易理解。但是,处理 'readable' 可能会提高吞吐量。

如果同时使用 'readable''data',则 'readable' 优先控制流,即只有在调用 stream.read() 时才会发出 'data'readableFlowing 属性将变为 false。如果在删除 'readable' 时存在 'data' 监听器,则流将开始流动,即 'data' 事件将在不调用 .resume() 的情况下发出。

事件: 'resume'

添加到:v0.9.4

当调用 stream.resume()readableFlowing 不为 true 时,将发出 'resume' 事件。

readable.destroy([error])

[历史]

版本更改
v14.0.0在已销毁的流上充当空操作。
v8.0.0添加到:v8.0.0
  • error <Error> 将作为有效负载传递到 'error' 事件中的错误
  • 返回: <this>

销毁流。可以选择发出 'error' 事件,并发出 'close' 事件(除非 emitClose 设置为 false)。此调用之后,可读流将释放任何内部资源,后续对 push() 的调用将被忽略。

一旦调用了 destroy(),任何进一步的调用都将是空操作,并且除了来自 _destroy() 的错误之外,不会再发出任何错误作为 'error'

实现者不应覆盖此方法,而应实现 readable._destroy()

readable.closed

添加到:v18.0.0

在发出 'close' 后为 true

readable.destroyed

添加到:v8.0.0

调用 readable.destroy() 后为 true

readable.isPaused()

添加到:v0.11.14

readable.isPaused() 方法返回 Readable 的当前运行状态。这主要由 readable.pipe() 方法的基础机制使用。在大多数典型情况下,没有理由直接使用此方法。

js
const readable = new stream.Readable()

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()

添加到:v0.9.4

readable.pause() 方法将导致处于流式模式的流停止发出 'data' 事件,从而退出流式模式。任何可用数据都将保留在内部缓冲区中。

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Received ${chunk.length} bytes of data.`)
  readable.pause()
  console.log('There will be no additional data for 1 second.')
  setTimeout(() => {
    console.log('Now data will start flowing again.')
    readable.resume()
  }, 1000)
})

如果存在 'readable' 事件监听器,则 readable.pause() 方法无效。

readable.pipe(destination[, options])

添加到:v0.9.4

readable.pipe() 方法将 Writable 流附加到 readable,使其自动切换到流式模式并将所有数据推送到附加的 Writable。数据流将自动管理,这样目标 Writable 流不会被更快的 Readable 流淹没。

以下示例将 readable 中的所有数据写入名为 file.txt 的文件中:

js
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// All the data from readable goes into 'file.txt'.
readable.pipe(writable)

可以将多个 Writable 流附加到单个 Readable 流。

readable.pipe() 方法返回对 目标 流的引用,从而可以设置管道流的链:

js
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)

默认情况下,当源 Readable 流发出 'end' 时,会在目标 Writable 流上调用 stream.end(),以便目标不再可写。要禁用此默认行为,可以将 end 选项传递为 false,使目标流保持打开状态:

js
reader.pipe(writer, { end: false })
reader.on('end', () => {
  writer.end('Goodbye\n')
})

一个重要的警告是,如果 Readable 流在处理过程中发出错误,则目标 Writable 不会自动关闭。如果发生错误,则必须手动关闭每个流才能防止内存泄漏。

process.stderrprocess.stdout Writable 流直到 Node.js 进程退出才关闭,而不管指定的选项如何。

readable.read([size])

添加到:v0.9.4

readable.read() 方法从内部缓冲区读取数据并将其返回。如果没有可用数据可供读取,则返回 null。默认情况下,数据以 Buffer 对象的形式返回,除非使用 readable.setEncoding() 方法指定了编码,或者流以对象模式运行。

可选的 size 参数指定要读取的特定字节数。如果无法读取 size 字节,则返回 null除非流已结束,在这种情况下,将返回内部缓冲区中剩余的所有数据。

如果未指定 size 参数,则将返回内部缓冲区中包含的所有数据。

size 参数必须小于或等于 1 GiB。

readable.read() 方法只能在暂停模式下运行的 Readable 流上调用。在流式模式下,readable.read() 会自动调用,直到内部缓冲区完全清空。

js
const readable = getReadableStreamSomehow()

// 'readable' may be triggered multiple times as data is buffered in
readable.on('readable', () => {
  let chunk
  console.log('Stream is readable (new data received in buffer)')
  // Use a loop to make sure we read all currently available data
  while (null !== (chunk = readable.read())) {
    console.log(`Read ${chunk.length} bytes of data...`)
  }
})

// 'end' will be triggered once when there is no more data available
readable.on('end', () => {
  console.log('Reached end of stream.')
})

每次调用 readable.read() 都返回一个数据块或 null,表示此时没有更多数据可读。这些块不会自动连接。因为单个 read() 调用不会返回所有数据,所以可能需要使用 while 循环来连续读取块,直到检索所有数据。读取大型文件时,.read() 可能会暂时返回 null,这表示它已使用所有缓冲内容,但可能还有更多数据有待缓冲。在这种情况下,一旦缓冲区中有更多数据,就会发出新的 'readable' 事件,'end' 事件表示数据传输结束。

因此,要从 readable 中读取文件的全部内容,需要在多个 'readable' 事件中收集块:

js
const chunks = []

readable.on('readable', () => {
  let chunk
  while (null !== (chunk = readable.read())) {
    chunks.push(chunk)
  }
})

readable.on('end', () => {
  const content = chunks.join('')
})

处于对象模式的 Readable 流将始终从对 readable.read(size) 的调用中返回单个项目,而不管 size 参数的值如何。

如果 readable.read() 方法返回数据块,则还会发出 'data' 事件。

在发出 'end' 事件后调用 stream.read([size]) 将返回 null。不会引发运行时错误。

readable.readable

添加到:v11.4.0

如果可以安全地调用 readable.read(),则为 true,这意味着流尚未销毁或发出 'error''end'

readable.readableAborted

添加到:v16.8.0

[稳定:1 - 实验性]

稳定:1 稳定性:1 - 实验性

返回在发出 'end' 之前流是否已销毁或出错。

readable.readableDidRead

添加到:v16.7.0, v14.18.0

[稳定:1 - 实验性]

稳定:1 稳定性:1 - 实验性

返回是否已发出 'data'

readable.readableEncoding

添加到:v12.7.0

给定 Readable 流的属性 encoding 的 Getter。可以使用 readable.setEncoding() 方法设置 encoding 属性。

readable.readableEnded

添加到:v12.9.0

发出 'end' 事件时变为 true

readable.errored

添加到:v18.0.0

如果流已使用错误销毁,则返回错误。

readable.readableFlowing

添加到:v9.4.0

此属性反映了 三种状态 部分中描述的 Readable 流的当前状态。

readable.readableHighWaterMark

添加到:v9.3.0

返回创建此 Readable 时传递的 highWaterMark 值。

readable.readableLength

添加到:v9.4.0

此属性包含队列中准备读取的字节数(或对象数)。该值提供有关 highWaterMark 状态的内省数据。

readable.readableObjectMode

添加到:v12.3.0

给定 Readable 流的属性 objectMode 的 Getter。

readable.resume()

[历史]

版本更改
v10.0.0如果存在 'readable' 事件监听器,则 resume() 无效。
v0.9.4添加到:v0.9.4

readable.resume() 方法使显式暂停的 Readable 流恢复发出 'data' 事件,并将流切换到流式模式。

readable.resume() 方法可用于完全使用流中的数据,而无需实际处理任何数据:

js
getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Reached the end, but did not read anything.')
  })

如果存在 'readable' 事件监听器,则 readable.resume() 方法无效。

readable.setEncoding(encoding)

添加到:v0.9.4

readable.setEncoding() 方法设置从 Readable 流读取数据的字符编码。

默认情况下,不分配任何编码,流数据将作为 Buffer 对象返回。设置编码会导致流数据作为指定编码的字符串而不是 Buffer 对象返回。例如,调用 readable.setEncoding('utf8') 将使输出数据被解释为 UTF-8 数据,并作为字符串传递。调用 readable.setEncoding('hex') 将使数据以十六进制字符串格式编码。

Readable 流将正确处理通过流传递的多字节字符,否则如果只是从流中作为 Buffer 对象提取,这些字符将被解码不正确。

js
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
  assert.equal(typeof chunk, 'string')
  console.log('Got %d characters of string data:', chunk.length)
})
readable.unpipe([destination])

添加到:v0.9.4

readable.unpipe() 方法分离先前使用 stream.pipe() 方法附加的 Writable 流。

如果未指定 destination,则分离所有管道。

如果指定了 destination,但没有为其设置管道,则该方法不执行任何操作。

js
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// All the data from readable goes into 'file.txt',
// but only for the first second.
readable.pipe(writable)
setTimeout(() => {
  console.log('Stop writing to file.txt.')
  readable.unpipe(writable)
  console.log('Manually close the file stream.')
  writable.end()
}, 1000)
readable.unshift(chunk[, encoding])

[历史]

版本更改
v22.0.0, v20.13.0chunk 参数现在可以是 TypedArrayDataView 实例。
v8.0.0chunk 参数现在可以是 Uint8Array 实例。
v0.9.11添加到:v0.9.11

chunk 作为 null 传递表示流的结尾 (EOF),其行为与 readable.push(null) 相同,之后无法再写入任何数据。EOF 信号位于缓冲区的末尾,任何缓冲数据仍将被刷新。

readable.unshift() 方法将数据块推回到内部缓冲区中。这在某些情况下非常有用,在这些情况下,流由需要“取消使用”它已乐观地从源中提取的数据量的代码使用,以便可以将数据传递给其他方。

在发出 'end' 事件后,不能调用 stream.unshift(chunk) 方法,否则将引发运行时错误。

使用 stream.unshift() 的开发人员通常应该考虑改用 Transform 流。有关更多信息,请参阅 流实现者的 API 部分。

js
// Pull off a header delimited by \n\n.
// Use unshift() if we get too much.
// Call the callback with (error, header, stream).
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
  stream.on('error', callback)
  stream.on('readable', onReadable)
  const decoder = new StringDecoder('utf8')
  let header = ''
  function onReadable() {
    let chunk
    while (null !== (chunk = stream.read())) {
      const str = decoder.write(chunk)
      if (str.includes('\n\n')) {
        // Found the header boundary.
        const split = str.split(/\n\n/)
        header += split.shift()
        const remaining = split.join('\n\n')
        const buf = Buffer.from(remaining, 'utf8')
        stream.removeListener('error', callback)
        // Remove the 'readable' listener before unshifting.
        stream.removeListener('readable', onReadable)
        if (buf.length) stream.unshift(buf)
        // Now the body of the message can be read from the stream.
        callback(null, header, stream)
        return
      }
      // Still reading the header.
      header += str
    }
  }
}

stream.push(chunk) 不同,stream.unshift(chunk) 不会通过重置流的内部读取状态来结束读取过程。如果在读取过程中(即在自定义流上的 stream._read() 实现中)调用 readable.unshift(),这可能会导致意外的结果。在调用 readable.unshift() 后立即使用 stream.push('') 将适当地重置读取状态,但是最好是在执行读取的过程中避免调用 readable.unshift()

readable.wrap(stream)

添加到:v0.9.4

在 Node.js 0.10 之前,流并未实现当前定义的整个 node:stream 模块 API。(有关更多信息,请参阅 兼容性)。

当使用发出 'data' 事件并且仅具有建议性的 stream.pause() 方法的旧版 Node.js 库时,可以使用 readable.wrap() 方法创建一个使用旧流作为其数据源的 Readable 流。

很少需要使用 readable.wrap(),但该方法已作为与旧版 Node.js 应用程序和库交互的便利功能提供。

js
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)

myReader.on('readable', () => {
  myReader.read() // etc.
})
readable[Symbol.asyncIterator]()

[历史]

版本更改
v11.14.0Symbol.asyncIterator 支持不再是实验性的。
v10.0.0添加到:v10.0.0
js
const fs = require('node:fs')

async function print(readable) {
  readable.setEncoding('utf8')
  let data = ''
  for await (const chunk of readable) {
    data += chunk
  }
  console.log(data)
}

print(fs.createReadStream('file')).catch(console.error)

如果循环使用 breakreturnthrow 终止,则流将被销毁。换句话说,遍历流将完全使用流。流将以等于 highWaterMark 选项的大小分块读取。在上面的代码示例中,如果文件的数据少于 64 KiB,则数据将位于单个块中,因为没有向 fs.createReadStream() 提供 highWaterMark 选项。

readable[Symbol.asyncDispose]()

添加到:v20.4.0, v18.18.0

[稳定:1 - 实验性]

稳定:1 稳定性:1 - 实验性

使用 AbortError 调用 readable.destroy(),并返回一个在流完成时完成的 promise。

readable.compose(stream[, options])

添加到:v19.1.0, v18.13.0

[稳定:1 - 实验性]

稳定:1 稳定性:1 - 实验性

js
import { Readable } from 'node:stream'

async function* splitToWords(source) {
  for await (const chunk of source) {
    const words = String(chunk).split(' ')

    for (const word of words) {
      yield word
    }
  }
}

const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords)
const words = await wordsStream.toArray()

console.log(words) // prints ['this', 'is', 'compose', 'as', 'operator']

有关更多信息,请参阅 stream.compose

readable.iterator([options])

**添加到:

双工流和转换流

类:stream.Duplex

[历史]

版本变更
v6.8.0Duplex 的实例在检查 instanceof stream.Writable 时现在返回 true
v0.9.4新增于:v0.9.4

双工流是同时实现 ReadableWritable 接口的流。

Duplex 流的示例包括:

duplex.allowHalfOpen

新增于:v0.9.4

如果为 false,则当可读端结束时,流将自动结束可写端。最初由 allowHalfOpen 构造函数选项设置,默认为 true

可以手动更改此选项来更改现有 Duplex 流实例的半打开行为,但必须在发出 'end' 事件之前更改。

类: stream.Transform

添加到: v0.9.4

转换流是 Duplex 流,其中输出与输入以某种方式相关。与所有 Duplex 流一样,Transform 流同时实现了 ReadableWritable 接口。

Transform 流的示例包括:

transform.destroy([error])

[历史]

版本更改
v14.0.0在已销毁的流上作为无操作工作。
v8.0.0添加到:v8.0.0

销毁流,并可选地发出 'error' 事件。此调用之后,转换流将释放任何内部资源。实现者不应覆盖此方法,而应实现 readable._destroy()Transform_destroy() 的默认实现也会发出 'close',除非 emitClose 设置为 false。

一旦调用了 destroy(),任何进一步的调用都将是无操作的,并且除了 _destroy() 之外,不会发出任何其他错误作为 'error'

stream.duplexPair([options])

新增于:v22.6.0, v20.17.0

  • options <Object> 传递给两个 Duplex 构造函数的值,用于设置缓冲等选项。
  • 返回值:包含两个 Duplex 实例的 <Array>

实用函数 duplexPair 返回一个包含两个项目的数组,每个项目都是一个与另一端连接的 Duplex 流:

js
const [sideA, sideB] = duplexPair()

写入一个流的内容在另一个流中变为可读。它提供了类似于网络连接的行为,客户端写入的数据可以被服务器读取,反之亦然。

Duplex 流是对称的;可以使用其中一个,而不会有任何行为差异。

stream.finished(stream[, options], callback)

[历史]

版本变更
v19.5.0添加了对 ReadableStreamWritableStream 的支持。
v15.11.0添加了 signal 选项。
v14.0.0finished(stream, cb) 将在调用回调之前等待 'close' 事件。实现尝试检测旧版流,并且仅将此行为应用于预期会发出 'close' 的流。
v14.0.0Readable 流上发出 'close' 事件早于 'end' 事件将导致 ERR_STREAM_PREMATURE_CLOSE 错误。
v14.0.0在调用 finished(stream, cb) 之前已完成的流上将调用回调。
v10.0.0新增于:v10.0.0
  • stream <Stream> | <ReadableStream> | <WritableStream> 可读和/或可写流/web 流。

  • options <Object>

    • error <boolean> 如果设置为 false,则对 emit('error', err) 的调用不被视为已完成。默认值: true
    • readable <boolean> 当设置为 false 时,即使流可能仍然可读,回调也会在流结束时被调用。默认值: true
    • writable <boolean> 当设置为 false 时,即使流可能仍然可写,回调也会在流结束时被调用。默认值: true
    • signal <AbortSignal> 允许中止等待流完成。如果信号被中止,底层流 不会 被中止。回调将使用 AbortError 被调用。此函数添加的所有注册侦听器也将被删除。
  • callback <Function> 一个回调函数,它接受一个可选的错误参数。

  • 返回值: <Function> 一个清理函数,用于移除所有已注册的监听器。

一个用于在流不再可读、可写或遇到错误或过早关闭事件时收到通知的函数。

js
const { finished } = require('node:stream')
const fs = require('node:fs')

const rs = fs.createReadStream('archive.tar')

finished(rs, err => {
  if (err) {
    console.error('Stream failed.', err)
  } else {
    console.log('Stream is done reading.')
  }
})

rs.resume() // 冲洗流。

尤其适用于流过早销毁(例如中止的 HTTP 请求)并且不会发出 'end''finish' 的错误处理场景。

finished API 提供了 Promise 版本

stream.finished() 在调用 callback 后会留下悬空的事件监听器(特别是 'error''end''finish''close')。这样做的原因是为了防止意外的 'error' 事件(由于不正确的流实现)导致意外崩溃。如果这是不需要的行为,则需要在回调中调用返回的清理函数:

js
const cleanup = finished(rs, err => {
  cleanup()
  // ...
})

stream.pipeline(source[, ...transforms], destination, callback)

stream.pipeline(streams, callback)

[历史]

版本变更
v19.7.0, v18.16.0新增对 Web Streams 的支持。
v18.0.0将无效回调传递给 callback 参数现在会抛出 ERR_INVALID_ARG_TYPE 而不是 ERR_INVALID_CALLBACK
v14.0.0pipeline(..., cb) 将在调用回调之前等待 'close' 事件。实现尝试检测旧版流,并且只对预期会发出 'close' 的流应用此行为。
v13.10.0新增对异步生成器的支持。
v10.0.0首次引入: v10.0.0

一个模块方法,用于在流和生成器之间进行管道传输,转发错误并正确清理,并在管道完成时提供回调。

js
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')

// 使用 pipeline API 可以轻松地将一系列流
// 连接在一起,并在管道完全完成时收到通知。

// 一个高效地 gzip 一个可能很大的 tar 文件的管道:

pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
  if (err) {
    console.error('管道失败。', err)
  } else {
    console.log('管道成功。')
  }
})

pipeline API 提供了一个 Promise 版本

stream.pipeline() 将调用 stream.destroy(err) 关闭所有流,除了:

  • 已发出 'end''close' 事件的可读流。
  • 已发出 'finish''close' 事件的可写流。

stream.pipeline() 在回调调用后会在流上留下悬空事件监听器。如果在失败后重用流,这会导致事件监听器泄漏和错误被吞没。如果最后一个流是可读的,则会移除悬空事件监听器,以便稍后可以再次使用最后一个流。

stream.pipeline() 在发生错误时关闭所有流。IncomingRequestpipeline 一起使用可能会导致意外行为,因为它会在没有发送预期响应的情况下销毁套接字。请参见下面的示例:

js
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')

const server = http.createServer((req, res) => {
  const fileStream = fs.createReadStream('./fileNotExist.txt')
  pipeline(fileStream, res, err => {
    if (err) {
      console.log(err) // 没有此文件
      // 一旦 `pipeline` 已销毁套接字,此消息就无法发送
      return res.end('error!!!')
    }
  })
})

stream.compose(...streams)

[历史]

版本变更
v21.1.0, v20.10.0添加了对流类的支持。
v19.8.0, v18.16.0添加了对 Web Streams 的支持。
v16.9.0在 v16.9.0 中添加

[稳定性: 1 - 实验性]

稳定性: 1 稳定性: 1 - stream.compose 处于实验阶段。

将两个或多个流组合成一个 Duplex 流,该流写入第一个流并从最后一个流读取。每个提供的流都使用 stream.pipeline 连接到下一个流。如果任何流发生错误,则所有流都将被销毁,包括外部 Duplex 流。

因为 stream.compose 返回一个新的流,该流又可以(并且应该)连接到其他流,所以它实现了组合。相比之下,当将流传递给 stream.pipeline 时,第一个流通常是可读流,最后一个流是可写流,形成一个闭环。

如果传入一个 Function,它必须是一个工厂方法,接受一个 source Iterable

js
import { compose, Transform } from 'node:stream'

const removeSpaces = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, String(chunk).replace(' ', ''))
  },
})

async function* toUpper(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase()
  }
}

let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
  res += buf
}

console.log(res) // 输出 'HELLOWORLD'

stream.compose 可用于将异步迭代器、生成器和函数转换为流。

  • AsyncIterable 转换为可读的 Duplex。不能产生 null
  • AsyncGeneratorFunction 转换为可读/可写的转换 Duplex。必须以源 AsyncIterable 作为第一个参数。不能产生 null
  • AsyncFunction 转换为可写的 Duplex。必须返回 nullundefined
js
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'

// 将 AsyncIterable 转换为可读的 Duplex。
const s1 = compose(
  (async function* () {
    yield 'Hello'
    yield 'World'
  })()
)

// 将 AsyncGenerator 转换为转换 Duplex。
const s2 = compose(async function* (source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase()
  }
})

let res = ''

// 将 AsyncFunction 转换为可写的 Duplex。
const s3 = compose(async function (source) {
  for await (const chunk of source) {
    res += chunk
  }
})

await finished(compose(s1, s2, s3))

console.log(res) // 输出 'HELLOWORLD'

参见 readable.compose(stream) 以了解 stream.compose 作为运算符的使用方法。

stream.Readable.from(iterable[, options])

新增于: v12.3.0, v10.17.0

  • iterable <Iterable> 实现 Symbol.asyncIteratorSymbol.iterator 可迭代协议的对象。如果传入空值,则会发出 'error' 事件。
  • options <Object> 提供给 new stream.Readable([options]) 的选项。默认情况下,Readable.from() 会将 options.objectMode 设置为 true,除非通过将 options.objectMode 设置为 false 明确选择退出。
  • 返回值: <stream.Readable>

一个用于根据迭代器创建可读流的实用方法。

js
const { Readable } = require('node:stream')

async function* generate() {
  yield 'hello'
  yield 'streams'
}

const readable = Readable.from(generate())

readable.on('data', chunk => {
  console.log(chunk)
})

出于性能原因,调用 Readable.from(string)Readable.from(buffer) 不会迭代字符串或缓冲区以匹配其他流的语义。

如果传入包含 Promise 的 Iterable 对象作为参数,则可能会导致未处理的拒绝。

js
const { Readable } = require('node:stream')

Readable.from([
  new Promise(resolve => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // 未处理的拒绝
])

stream.Readable.fromWeb(readableStream[, options])

新增于:v17.0.0

[稳定性:1 - 实验性]

稳定性:1 稳定性:1 - 实验性

stream.Readable.isDisturbed(stream)

新增于:v16.8.0

[稳定性:1 - 实验性]

稳定性:1 稳定性:1 - 实验性

返回流是否已被读取或取消。

stream.isErrored(stream)

新增于:v17.3.0, v16.14.0

[稳定性:1 - 实验性]

稳定性:1 稳定性:1 - 实验性

返回流是否遇到错误。

stream.isReadable(stream)

新增于:v17.4.0, v16.14.0

[稳定性:1 - 实验性]

稳定性:1 稳定性:1 - 实验性

返回流是否可读。

stream.Readable.toWeb(streamReadable[, options])

新增于: v17.0.0

[稳定性: 1 - 实验性]

稳定性: 1 稳定性: 1 - 实验性

  • streamReadable <stream.Readable>

  • options <Object>

    • strategy <Object>
    • highWaterMark <number> 在对给定的 stream.Readable 应用背压读取之前,创建的 ReadableStream 的最大内部队列大小。如果未提供值,则将从给定的 stream.Readable 获取。
    • size <Function> 用于计算给定数据块大小的函数。如果未提供值,则所有块的大小都将为 1
    • chunk <any>
    • 返回值: <number>
  • 返回值: <ReadableStream>

stream.Writable.fromWeb(writableStream[, options])

新增于: v17.0.0

[稳定性: 1 - 实验性]

稳定性: 1 稳定性: 1 - 实验性

stream.Writable.toWeb(streamWritable)

新增于: v17.0.0

[稳定性: 1 - 实验性]

稳定性: 1 稳定性: 1 - 实验性

stream.Duplex.from(src)

[历史]

版本变更
v19.5.0, v18.17.0src 参数现在可以是 ReadableStreamWritableStream
v16.8.0新增于:v16.8.0

用于创建双向流的实用方法。

  • Stream 将可写流转换为可写的 Duplex,并将可读流转换为 Duplex
  • Blob 转换为可读 Duplex
  • string 转换为可读 Duplex
  • ArrayBuffer 转换为可读 Duplex
  • AsyncIterable 转换为可读 Duplex。不能产生 null
  • AsyncGeneratorFunction 转换为可读/可写的转换 Duplex。必须以源 AsyncIterable 作为第一个参数。不能产生 null
  • AsyncFunction 转换为可写的 Duplex。必须返回 nullundefined
  • Object ({ writable, readable })readablewritable 转换为 Stream,然后将它们组合成 Duplex,其中 Duplex 将写入 writable 并从 readable 读取。
  • Promise 转换为可读 Duplex。值 null 将被忽略。
  • ReadableStream 转换为可读 Duplex
  • WritableStream 转换为可写的 Duplex
  • 返回值: <stream.Duplex>

如果传入包含 Promise 的 Iterable 对象作为参数,则可能会导致未处理的拒绝。

js
const { Duplex } = require('node:stream')

Duplex.from([
  new Promise(resolve => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // 未处理的拒绝
])

stream.Duplex.fromWeb(pair[, options])

新增于: v17.0.0

[稳定性: 1 - 实验性]

稳定性: 1 稳定性: 1 - 实验性

js
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world')
  },
})

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk)
  },
})

const pair = {
  readable,
  writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })

duplex.write('hello')

for await (const chunk of duplex) {
  console.log('readable', chunk)
}
js
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world')
  },
})

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk)
  },
})

const pair = {
  readable,
  writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })

duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))

stream.Duplex.toWeb(streamDuplex)

新增于: v17.0.0

[稳定性: 1 - 实验性]

稳定性: 1 - 实验性

js
import { Duplex } from 'node:stream'

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world')
    this.push(null)
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk)
    callback()
  },
})

const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')

const { value } = await readable.getReader().read()
console.log('readable', value)
js
const { Duplex } = require('node:stream')

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world')
    this.push(null)
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk)
    callback()
  },
})

const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')

readable
  .getReader()
  .read()
  .then(result => {
    console.log('readable', result.value)
  })

stream.addAbortSignal(signal, stream)

[历史]

版本变更
v19.7.0, v18.16.0添加了对 ReadableStreamWritableStream 的支持。
v15.4.0首次引入: v15.4.0

AbortSignal 附加到可读或可写流。这允许代码使用 AbortController 来控制流的销毁。

调用传递的 AbortSignal 对应的 AbortController 上的 abort 方法,其行为与在流上调用 .destroy(new AbortError()) 相同,对于 Web 流则为 controller.error(new AbortError())

js
const fs = require('node:fs')

const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// 稍后,中止操作并关闭流
controller.abort()

或者使用带有可读流作为异步迭代器的 AbortSignal

js
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // 设置超时
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
  try {
    for await (const chunk of stream) {
      await process(chunk)
    }
  } catch (e) {
    if (e.name === 'AbortError') {
      // 操作被取消
    } else {
      throw e
    }
  }
})()

或者使用带有 ReadableStreamAbortSignal

js
const controller = new AbortController()
const rs = new ReadableStream({
  start(controller) {
    controller.enqueue('hello')
    controller.enqueue('world')
    controller.close()
  },
})

addAbortSignal(controller.signal, rs)

finished(rs, err => {
  if (err) {
    if (err.name === 'AbortError') {
      // 操作被取消
    }
  }
})

const reader = rs.getReader()

reader.read().then(({ value, done }) => {
  console.log(value) // hello
  console.log(done) // false
  controller.abort()
})

stream.getDefaultHighWaterMark(objectMode)

新增于: v19.9.0, v18.17.0

返回流使用的默认 highWaterMark。默认为 65536 (64 KiB),objectModetrue 时为 16

stream.setDefaultHighWaterMark(objectMode, value)

新增于: v19.9.0, v18.17.0

设置流使用的默认 highWaterMark。

流实现者的 API

node:stream 模块的 API 旨在使使用 JavaScript 的原型继承模型轻松实现流成为可能。

首先,流开发者将声明一个扩展四个基本流类之一 (stream.Writablestream.Readablestream.Duplexstream.Transform) 的新 JavaScript 类,确保调用相应的父类构造函数:

js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  constructor({ highWaterMark, ...options }) {
    super({ highWaterMark })
    // ...
  }
}

扩展流时,请记住用户可以在将这些选项转发给基构造函数之前提供哪些选项以及应该提供哪些选项。例如,如果实现对 autoDestroyemitClose 选项做出了假设,则不允许用户覆盖这些选项。明确说明转发了哪些选项,而不是隐式地转发所有选项。

然后,新的流类必须实现一个或多个特定方法,具体取决于正在创建的流的类型,如下表所述:

用例要实现的方法
只读Readable_read()
只写Writable_write() , _writev() , _final()
读写Duplex_read() , _write() , _writev() , _final()
对写入的数据进行操作,然后读取结果Transform_transform() , _flush() , _final()

流的实现代码绝不应调用专供使用者使用的流的“公共”方法(如流使用者的 API 部分所述)。这样做可能会导致使用该流的应用程序代码出现不良副作用。

避免覆盖公共方法,例如 write()end()cork()uncork()read()destroy(),或通过 .emit() 发出内部事件,例如 'error''data''end''finish''close'。这样做可能会破坏当前和未来的流不变性,从而导致与其他流、流实用程序和用户期望的行为和/或兼容性问题。

简化构造

新增于:v1.2.0

对于许多简单的情况,可以无需继承即可创建一个流。这可以通过直接创建 stream.Writablestream.Readablestream.Duplexstream.Transform 对象的实例并传入适当的方法作为构造函数选项来实现。

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  construct(callback) {
    // 初始化状态并加载资源...
  },
  write(chunk, encoding, callback) {
    // ...
  },
  destroy() {
    // 释放资源...
  },
})

实现可写流

stream.Writable 类被扩展以实现一个 Writable 流。

自定义 Writable必须调用 new stream.Writable([options]) 构造函数并实现 writable._write() 和/或 writable._writev() 方法。

new stream.Writable([options])

[历史]

版本变更
v22.0.0提高默认 highWaterMark。
v15.5.0支持传入 AbortSignal。
v14.0.0autoDestroy 选项的默认值更改为 true
v11.2.0, v10.16.0添加 autoDestroy 选项,以便在流发出 'finish' 或错误时自动 destroy() 流。
v10.0.0添加 emitClose 选项以指定是否在销毁时发出 'close'
js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  constructor(options) {
    // 调用 stream.Writable() 构造函数。
    super(options)
    // ...
  }
}

或者,当使用 pre-ES6 样式构造函数时:

js
const { Writable } = require('node:stream')
const util = require('node:util')

function MyWritable(options) {
  if (!(this instanceof MyWritable)) return new MyWritable(options)
  Writable.call(this, options)
}
util.inherits(MyWritable, Writable)

或者,使用简化的构造函数方法:

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
})

在对应于传递的 AbortSignalAbortController 上调用 abort 将与在可写流上调用 .destroy(new AbortError()) 的行为相同。

js
const { Writable } = require('node:stream')

const controller = new AbortController()
const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
  signal: controller.signal,
})
// 稍后,中止操作并关闭流
controller.abort()

writable._construct(callback)

新增于:v15.0.0

  • callback <Function> 当流初始化完成时调用此函数(可选地带有一个错误参数)。

_construct() 方法绝不能直接调用。子类可以实现它,如果这样做了,则只会由内部的 Writable 类方法调用。

此可选函数将在流构造函数返回后的一个 tick 中被调用,将任何 _write()_final()_destroy() 调用延迟到调用 callback 之后。这对于在可以使用流之前初始化状态或异步初始化资源非常有用。

js
const { Writable } = require('node:stream')
const fs = require('node:fs')

class WriteStream extends Writable {
  constructor(filename) {
    super()
    this.filename = filename
    this.fd = null
  }
  _construct(callback) {
    fs.open(this.filename, 'w', (err, fd) => {
      if (err) {
        callback(err)
      } else {
        this.fd = fd
        callback()
      }
    })
  }
  _write(chunk, encoding, callback) {
    fs.write(this.fd, chunk, callback)
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, er => callback(er || err))
    } else {
      callback(err)
    }
  }
}

writable._write(chunk, encoding, callback)

[历史]

版本变更
v12.11.0提供 _writev() 时,_write() 可选。
  • chunk <Buffer> | <string> | <any> 将要写入的 Buffer,它由传递给 stream.write()string 转换而来。如果流的 decodeStrings 选项为 false 或流处于对象模式,则不会转换 chunk,它将保持传递给 stream.write() 的原始值。
  • encoding <string> 如果 chunk 是字符串,则 encoding 是该字符串的字符编码。如果 chunkBuffer,或者流处于对象模式,则可以忽略 encoding
  • callback <Function> 当提供的 chunk 处理完成时调用此函数(可以选择使用错误参数)。

所有 Writable 流实现都必须提供 writable._write() 和/或 writable._writev() 方法来将数据发送到底层资源。

Transform 流提供了 writable._write() 的自身实现。

此函数不能由应用程序代码直接调用。它应该由子类实现,并且只能由内部 Writable 类方法调用。

callback 函数必须在 writable._write() 内部同步调用或异步调用(即不同的 tick),以信号写入成功完成或因错误而失败。如果调用失败,则传递给 callback 的第一个参数必须是 Error 对象;如果写入成功,则为 null

在调用 writable._write() 和调用 callback 之间发生的所有 writable.write() 调用都会导致写入的数据被缓冲。当调用 callback 时,流可能会发出 'drain' 事件。如果流实现能够一次处理多个数据块,则应实现 writable._writev() 方法。

如果在构造函数选项中将 decodeStrings 属性显式设置为 false,则 chunk 将保持与传递给 .write() 的相同对象,并且可能是字符串而不是 Buffer。这是为了支持对某些字符串数据编码进行优化处理的实现。在这种情况下,encoding 参数将指示字符串的字符编码。否则,可以安全地忽略 encoding 参数。

writable._write() 方法前缀带有一个下划线,因为它对定义它的类是内部的,并且不应该由用户程序直接调用。

writable._writev(chunks, callback)

  • chunks <Object[]> 待写入的数据。其值为一个 <Object> 数组,数组中的每个对象代表一个需要写入的离散数据块。这些对象的属性包括:

    • chunk <Buffer> | <string> 包含待写入数据的 Buffer 实例或字符串。如果 Writable 在创建时将 decodeStrings 选项设置为 false 并向 write() 传递了一个字符串,则 chunk 将为字符串。
    • encoding <string> chunk 的字符编码。如果 chunk 是一个 Buffer,则 encoding 将为 'buffer'
  • callback <Function> 一个回调函数(可选地包含一个错误参数),在为提供的块完成处理后调用。

此函数绝不能由应用程序代码直接调用。它应该由子类实现,并且仅由内部 Writable 类方法调用。

在能够一次处理多个数据块的流实现中,可以额外实现或替代 writable._write() 实现 writable._writev() 方法。如果已实现,并且存在来自先前写入的缓冲数据,则将调用 _writev() 而不是 _write()

writable._writev() 方法以下划线为前缀,因为它对定义它的类是内部的,并且永远不应该被用户程序直接调用。

writable._destroy(err, callback)

新增于: v8.0.0

  • err <Error> 一个可能的错误。
  • callback <Function> 一个回调函数,它可以接收一个可选的错误参数。

_destroy() 方法由 writable.destroy() 调用。子类可以重写它,但绝不能直接调用它。

writable._final(callback)

新增于: v8.0.0

  • callback <Function> 完成写入任何剩余数据后调用此函数(可选地带有一个错误参数)。

_final() 方法绝不能直接调用。子类可以实现它,如果实现了,则只会由内部 Writable 类方法调用。

这个可选函数会在流关闭之前被调用,将 'finish' 事件延迟到调用 callback 为止。这对于在流结束之前关闭资源或写入缓冲数据非常有用。

写入错误

在处理 writable._write()writable._writev()writable._final() 方法期间发生的错误必须通过调用回调并将错误作为第一个参数传递来传播。从这些方法内部抛出 Error 或手动发出 'error' 事件会导致未定义的行为。

如果 Readable 流在 Writable 发出错误时管道到 Writable 流,则 Readable 流将被取消管道连接。

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'))
    } else {
      callback()
    }
  },
})

可写流示例

以下说明了一个相当简单(并且有点无用)的自定义 Writable 流实现。虽然这个特定的 Writable 流实例没有任何实际的用途,但该示例说明了自定义 Writable 流实例所需的每个元素:

js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'))
    } else {
      callback()
    }
  }
}

可写流中解码缓冲区

解码缓冲区是一项常见任务,例如,当使用输入为字符串的转换器时。当使用多字节字符编码(如 UTF-8)时,这不是一个简单的过程。以下示例显示了如何使用 StringDecoderWritable 解码多字节字符串。

js
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')

class StringWritable extends Writable {
  constructor(options) {
    super(options)
    this._decoder = new StringDecoder(options?.defaultEncoding)
    this.data = ''
  }
  _write(chunk, encoding, callback) {
    if (encoding === 'buffer') {
      chunk = this._decoder.write(chunk)
    }
    this.data += chunk
    callback()
  }
  _final(callback) {
    this.data += this._decoder.end()
    callback()
  }
}

const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()

w.write('currency: ')
w.write(euro[0])
w.end(euro[1])

console.log(w.data) // currency: €

实现可读流

stream.Readable 类被扩展用于实现一个 Readable 流。

自定义 Readable必须调用 new stream.Readable([options]) 构造函数并实现 readable._read() 方法。

new stream.Readable([options])

[历史]

版本变更
v22.0.0提升默认 highWaterMark。
v15.5.0支持传入 AbortSignal。
v14.0.0autoDestroy 选项的默认值更改为 true
v11.2.0, v10.16.0添加 autoDestroy 选项,以便在流发出 'end' 或错误时自动 destroy() 流。
  • options <对象>
    • highWaterMark <数字> 内部缓冲区中存储的最大字节数,超过此数后将停止从底层资源读取。默认值: 65536 (64 KiB),对于 objectMode 流则为 16
    • encoding <字符串> 如果指定,则缓冲区将使用指定的编码解码为字符串。默认值: null
    • objectMode <布尔值> 此流是否应表现为对象流。这意味着 stream.read(n) 返回单个值而不是大小为 nBuffer默认值: false
    • emitClose <布尔值> 流是否在销毁后发出 'close' 事件。默认值: true
    • read <函数> stream._read() 方法的实现。
    • destroy <函数> stream._destroy() 方法的实现。
    • construct <函数> stream._construct() 方法的实现。
    • autoDestroy <布尔值> 此流是否应该在其结束之后自动调用 .destroy() 方法。默认值: true
    • signal <AbortSignal> 代表可能取消的信号。
js
const { Readable } = require('node:stream')

class MyReadable extends Readable {
  constructor(options) {
    // 调用 stream.Readable(options) 构造函数。
    super(options)
    // ...
  }
}

或者,使用 pre-ES6 样式的构造函数:

js
const { Readable } = require('node:stream')
const util = require('node:util')

function MyReadable(options) {
  if (!(this instanceof MyReadable)) return new MyReadable(options)
  Readable.call(this, options)
}
util.inherits(MyReadable, Readable)

或者,使用简化的构造函数方法:

js
const { Readable } = require('node:stream')

const myReadable = new Readable({
  read(size) {
    // ...
  },
})

对传入的 AbortSignal 对应的 AbortController 调用 abort 将与对创建的可读流调用 .destroy(new AbortError()) 的行为相同。

js
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
  read(size) {
    // ...
  },
  signal: controller.signal,
})
// 稍后,中止操作并关闭流
controller.abort()

readable._construct(callback)

新增于:v15.0.0

  • callback <Function> 当流初始化完成时调用此函数(可选地带有一个错误参数)。

_construct() 方法绝不能直接调用。子类可以实现它,如果实现了,则仅由内部 Readable 类方法调用。

此可选函数将由流构造函数在下个 tick 中调度,将任何 _read()_destroy() 调用延迟到调用 callback 之后。这对于在可以使用流之前初始化状态或异步初始化资源非常有用。

js
const { Readable } = require('node:stream')
const fs = require('node:fs')

class ReadStream extends Readable {
  constructor(filename) {
    super()
    this.filename = filename
    this.fd = null
  }
  _construct(callback) {
    fs.open(this.filename, (err, fd) => {
      if (err) {
        callback(err)
      } else {
        this.fd = fd
        callback()
      }
    })
  }
  _read(n) {
    const buf = Buffer.alloc(n)
    fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
      if (err) {
        this.destroy(err)
      } else {
        this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
      }
    })
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, er => callback(er || err))
    } else {
      callback(err)
    }
  }
}

readable._read(size)

新增于:v0.9.4

此函数绝不能由应用程序代码直接调用。它应该由子类实现,并且只能由内部的 Readable 类方法调用。

所有 Readable 流实现都必须提供 readable._read() 方法的实现,以从底层资源获取数据。

当调用 readable._read() 时,如果资源中有可用数据,则实现应该开始使用 this.push(dataChunk) 方法将数据推入读取队列。一旦流准备好接受更多数据,在每次调用 this.push(dataChunk) 之后,将再次调用 _read()_read() 可以继续从资源读取数据并推送数据,直到 readable.push() 返回 false。只有当 _read() 在停止后再次被调用时,它才能恢复将额外的数据推入队列。

一旦调用了 readable._read() 方法,在通过 readable.push() 方法推送更多数据之前,将不会再次调用它。空数据(例如空缓冲区和字符串)不会导致调用 readable._read()

size 参数是建议性的。对于“读取”是一个返回数据的单个操作的实现,可以使用 size 参数来确定要获取多少数据。其他实现可能会忽略此参数,并在数据可用时提供数据。无需等到有 size 字节可用再调用 stream.push(chunk)

readable._read() 方法以下划线为前缀,因为它在定义它的类中是内部的,用户程序绝不应该直接调用它。

readable._destroy(err, callback)

新增于: v8.0.0

  • err <Error> 一个可能的错误。
  • callback <Function> 一个回调函数,它接受一个可选的错误参数。

_destroy() 方法由 readable.destroy() 调用。子类可以重写它,但绝不能直接调用它。

readable.push(chunk[, encoding])

[历史]

版本变更
v22.0.0, v20.13.0chunk 参数现在可以是 TypedArrayDataView 实例。
v8.0.0chunk 参数现在可以是 Uint8Array 实例。

chunk<Buffer><TypedArray><DataView><string> 时,数据块 chunk 将被添加到内部队列中,供流的用户使用。将 chunk 作为 null 传递表示流的结束 (EOF),之后将无法再写入任何数据。

Readable 以暂停模式运行时,可以使用 readable.read() 方法在发出 'readable' 事件时读取使用 readable.push() 添加的数据。

Readable 以流式模式运行时,使用 readable.push() 添加的数据将通过发出 'data' 事件来传递。

readable.push() 方法的设计尽可能灵活。例如,当包装提供某种暂停/恢复机制和数据回调的较低级别源时,可以使用自定义 Readable 实例包装低级别源:

js
// `_source` 是一个具有 readStop() 和 readStart() 方法的对象,
// 以及一个在有数据时被调用的 `ondata` 成员,
// 和一个在数据结束时被调用的 `onend` 成员。

class SourceWrapper extends Readable {
  constructor(options) {
    super(options)

    this._source = getLowLevelSourceObject()

    // 每当有数据时,将其推入内部缓冲区。
    this._source.ondata = chunk => {
      // 如果 push() 返回 false,则停止从源读取。
      if (!this.push(chunk)) this._source.readStop()
    }

    // 当源结束时,推送 EOF 信号 `null` 块。
    this._source.onend = () => {
      this.push(null)
    }
  }
  // _read() 将在流想要拉取更多数据时被调用。
  // 此情况下忽略建议的大小参数。
  _read(size) {
    this._source.readStart()
  }
}

readable.push() 方法用于将内容推入内部缓冲区。它可以由 readable._read() 方法驱动。

对于非对象模式的流,如果 readable.push()chunk 参数为 undefined,则将其视为空字符串或缓冲区。有关更多信息,请参见 readable.push('')

读取时的错误

在处理 readable._read() 期间发生的错误必须通过 readable.destroy(err) 方法传播。从 readable._read() 中抛出 Error 或手动发出 'error' 事件会导致未定义的行为。

js
const { Readable } = require('node:stream')

const myReadable = new Readable({
  read(size) {
    const err = checkSomeErrorCondition()
    if (err) {
      this.destroy(err)
    } else {
      // 执行某些操作。
    }
  },
})

一个计数流示例

以下是 Readable 流的基本示例,它按升序发出从 1 到 1,000,000 的数字,然后结束。

js
const { Readable } = require('node:stream')

class Counter extends Readable {
  constructor(opt) {
    super(opt)
    this._max = 1000000
    this._index = 1
  }

  _read() {
    const i = this._index++
    if (i > this._max) this.push(null)
    else {
      const str = String(i)
      const buf = Buffer.from(str, 'ascii')
      this.push(buf)
    }
  }
}

实现双工流

Duplex 流同时实现了 ReadableWritable,例如 TCP 套接字连接。

由于 JavaScript 不支持多重继承,因此扩展 stream.Duplex 类来实现 Duplex 流(而不是扩展 stream.Readable stream.Writable 类)。

stream.Duplex 类原型继承自 stream.Readable,并寄生继承自 stream.Writable,但由于在 stream.Writable 上重写了 Symbol.hasInstanceinstanceof 将对这两个基类都正常工作。

自定义 Duplex必须调用 new stream.Duplex([options]) 构造函数并同时实现 readable._read()writable._write() 方法。

new stream.Duplex(options)

[历史]

版本变更
v8.4.0现在支持 readableHighWaterMarkwritableHighWaterMark 选项。
  • options <对象> 传递给 WritableReadable 构造函数。还具有以下字段:
    • allowHalfOpen <布尔值> 如果设置为 false,则当可读端结束时,流将自动结束可写端。默认值: true
    • readable <布尔值> 设置 Duplex 是否应可读。默认值: true
    • writable <布尔值> 设置 Duplex 是否应可写。默认值: true
    • readableObjectMode <布尔值> 为流的可读端设置 objectMode。如果 objectModetrue,则无效。默认值: false
    • writableObjectMode <布尔值> 为流的可写端设置 objectMode。如果 objectModetrue,则无效。默认值: false
    • readableHighWaterMark <数字> 为流的可读端设置 highWaterMark。如果提供了 highWaterMark,则无效。
    • writableHighWaterMark <数字> 为流的可写端设置 highWaterMark。如果提供了 highWaterMark,则无效。
js
const { Duplex } = require('node:stream')

class MyDuplex extends Duplex {
  constructor(options) {
    super(options)
    // ...
  }
}

或者,使用 pre-ES6 样式的构造函数:

js
const { Duplex } = require('node:stream')
const util = require('node:util')

function MyDuplex(options) {
  if (!(this instanceof MyDuplex)) return new MyDuplex(options)
  Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)

或者,使用简化的构造函数方法:

js
const { Duplex } = require('node:stream')

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  },
})

使用管道时:

js
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')

pipeline(
  fs.createReadStream('object.json').setEncoding('utf8'),
  new Transform({
    decodeStrings: false, // 接受字符串输入而不是缓冲区
    construct(callback) {
      this.data = ''
      callback()
    },
    transform(chunk, encoding, callback) {
      this.data += chunk
      callback()
    },
    flush(callback) {
      try {
        // 确保是有效的 JSON。
        JSON.parse(this.data)
        this.push(this.data)
        callback()
      } catch (err) {
        callback(err)
      }
    },
  }),
  fs.createWriteStream('valid-object.json'),
  err => {
    if (err) {
      console.error('failed', err)
    } else {
      console.log('completed')
    }
  }
)

一个双工流示例

以下展示了一个简单的Duplex流示例,它包装了一个假设的低级源对象,可以向该对象写入数据,并从中读取数据,尽管使用的 API 与 Node.js 流不兼容。以下展示了一个简单的Duplex流示例,它通过Writable接口缓冲传入的写入数据,并通过Readable接口读取回数据。

js
const { Duplex } = require('node:stream')
const kSource = Symbol('source')

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options)
    this[kSource] = source
  }

  _write(chunk, encoding, callback) {
    // 底层源只处理字符串。
    if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
    this[kSource].writeSomeData(chunk)
    callback()
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding))
    })
  }
}

Duplex流最重要的方面是,ReadableWritable端尽管共存于单个对象实例中,但它们彼此独立运行。

对象模式双工流

对于 Duplex 流,可以使用 readableObjectModewritableObjectMode 选项分别为 ReadableWritable 端独占设置 objectMode

在下面的示例中,例如,创建一个新的 Transform 流(它是 Duplex 流的一种类型),它具有一个对象模式 Writable 端,该端接受转换为十六进制字符串的 JavaScript 数字在 Readable 端。

js
const { Transform } = require('node:stream')

// 所有 Transform 流也是 Duplex 流。
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // 根据需要将块强制转换为数字。
    chunk |= 0

    // 将块转换为其他内容。
    const data = chunk.toString(16)

    // 将数据推送到可读队列。
    callback(null, '0'.repeat(data.length % 2) + data)
  },
})

myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))

myTransform.write(1)
// 打印:01
myTransform.write(10)
// 打印:0a
myTransform.write(100)
// 打印:64

实现转换流

Transform 流是一个 Duplex 流,其中输出通过某种方式从输入计算得出。例如,zlib 流或 crypto 流可以压缩、加密或解密数据。

输出不需要与输入大小相同、块数相同或到达时间相同。例如,Hash 流只会有一个输出块,该块在输入结束时提供。zlib 流产生的输出可能比其输入小得多或大得多。

stream.Transform 类被扩展以实现 Transform 流。

stream.Transform 类原型继承自 stream.Duplex 并实现其自身的 writable._write()readable._read() 方法版本。自定义 Transform 实现必须实现 transform._transform() 方法,并且也可以实现 transform._flush() 方法。

使用 Transform 流时必须小心,因为写入流的数据可能会导致流的可写端暂停,如果可读端的输出未被使用。

new stream.Transform([options])

js
const { Transform } = require('node:stream')

class MyTransform extends Transform {
  constructor(options) {
    super(options)
    // ...
  }
}

或者,使用 ES6 之前的构造函数风格:

js
const { Transform } = require('node:stream')
const util = require('node:util')

function MyTransform(options) {
  if (!(this instanceof MyTransform)) return new MyTransform(options)
  Transform.call(this, options)
}
util.inherits(MyTransform, Transform)

或者,使用简化的构造函数方法:

js
const { Transform } = require('node:stream')

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  },
})

事件: 'end'

'end' 事件来自 stream.Readable 类。在所有数据输出后发出 'end' 事件,这发生在 transform._flush() 中的回调函数被调用之后。如果发生错误,则不应发出 'end' 事件。

事件: 'finish'

'finish' 事件来自 stream.Writable 类。在调用 stream.end() 并且所有块都已由 stream._transform() 处理之后,将发出 'finish' 事件。如果发生错误,则不应发出 'finish' 事件。

transform._flush(callback)

  • callback <Function> 一个回调函数(可选地带有一个错误参数和数据),用于在剩余数据被刷新后调用。

此函数绝不能由应用程序代码直接调用。它应该由子类实现,并且仅由内部 Readable 类方法调用。

在某些情况下,转换操作可能需要在流的末尾发出额外的少量数据。例如,zlib 压缩流将存储用于最佳压缩输出的内部状态量。但是,当流结束时,需要刷新这些额外的数据,以便压缩数据完整。

自定义 Transform 实现可以实现 transform._flush() 方法。当没有更多写入的数据要使用时,但在发出 'end' 事件以指示 Readable 流结束之前,将调用此方法。

transform._flush() 实现中,可以根据需要调用零次或多次 transform.push() 方法。当刷新操作完成后,必须调用 callback 函数。

transform._flush() 方法以下划线为前缀,因为它是定义它的类的内部方法,用户程序绝不应该直接调用它。

transform._transform(chunk, encoding, callback)

  • chunk <Buffer> | <string> | <any> 将要转换的Buffer,由传递给stream.write()string转换而来。如果流的decodeStrings选项为false或流处于对象模式,则不会转换chunk,它将保持传递给stream.write() 的任何值。
  • encoding <string> 如果chunk是字符串,则这是编码类型。如果chunk是缓冲区,则这是特殊值'buffer'。在这种情况下忽略它。
  • callback <Function> 一个回调函数(可选地带有一个错误参数和数据),在提供的chunk被处理后调用。

此函数绝不能由应用程序代码直接调用。它应该由子类实现,并且只能由内部Readable类方法调用。

所有Transform流实现必须提供一个_transform()方法来接受输入并产生输出。transform._transform()实现处理正在写入的字节,计算输出,然后使用transform.push()方法将该输出传递给可读部分。

transform.push()方法可以调用零次或多次以从单个输入块生成输出,这取决于由于块而要输出多少内容。

有可能从任何给定的输入数据块都不会生成输出。

只有当当前块完全被消耗时,才能调用callback函数。如果在处理输入时发生错误,则传递给callback的第一个参数必须是Error对象,否则为null。如果传递给callback的第二个参数,它将被转发到transform.push()方法,但前提是第一个参数为假值。换句话说,以下是等效的:

js
transform.prototype._transform = function (data, encoding, callback) {
  this.push(data)
  callback()
}

transform.prototype._transform = function (data, encoding, callback) {
  callback(null, data)
}

transform._transform()方法以下划线为前缀,因为它对定义它的类是内部的,并且用户程序绝不应该直接调用它。

transform._transform() 从不会并行调用;流实现了一种队列机制,要接收下一个块,必须调用callback,同步或异步均可。

类: stream.PassThrough

stream.PassThrough 类是对 Transform 流的一种简单的实现,它只是将输入字节传递到输出。其主要用途是用于示例和测试,但在某些情况下,stream.PassThrough 作为新型流的构建块也很有用。

附加说明

流与异步生成器和异步迭代器的兼容性

随着 JavaScript 中对异步生成器和迭代器的支持,异步生成器此时实际上是一种一流的语言级流构造。

下面提供了一些使用 Node.js 流与异步生成器和异步迭代器的常见互操作案例。

使用异步迭代器使用可读流

js
;(async function () {
  for await (const chunk of readable) {
    console.log(chunk)
  }
})()

异步迭代器在流上注册一个永久错误处理程序,以防止任何未处理的销毁后错误。

使用异步生成器创建可读流

可以使用 Readable.from() 工具方法从异步生成器创建 Node.js 可读流:

js
const { Readable } = require('node:stream')

const ac = new AbortController()
const signal = ac.signal

async function* generate() {
  yield 'a'
  await someLongRunningFn({ signal })
  yield 'b'
  yield 'c'
}

const readable = Readable.from(generate())
readable.on('close', () => {
  ac.abort()
})

readable.on('data', chunk => {
  console.log(chunk)
})

从异步迭代器管道到可写流

从异步迭代器写入可写流时,请确保正确处理背压和错误。stream.pipeline() 隐藏了背压和背压相关错误的处理:

js
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')

const writable = fs.createWriteStream('./file')

const ac = new AbortController()
const signal = ac.signal

const iterator = createIterator({ signal })

// 回调模式
pipeline(iterator, writable, (err, value) => {
  if (err) {
    console.error(err)
  } else {
    console.log(value, 'value returned')
  }
}).on('close', () => {
  ac.abort()
})

// Promise 模式
pipelinePromise(iterator, writable)
  .then(value => {
    console.log(value, 'value returned')
  })
  .catch(err => {
    console.error(err)
    ac.abort()
  })

与旧版 Node.js 的兼容性

在 Node.js 0.10 之前,Readable 流接口比较简单,但也功能较弱,实用性较差。

  • 不会等待对 stream.read() 方法的调用,'data' 事件会立即开始发出。需要执行一定量的工作来决定如何处理数据的应用程序需要将读取的数据存储到缓冲区中,以免数据丢失。
  • stream.pause() 方法是建议性的,而不是保证性的。这意味着即使流处于暂停状态,仍然需要准备好接收 'data' 事件。

在 Node.js 0.10 中,添加了 Readable 类。为了与旧版 Node.js 程序向后兼容,当添加 'data' 事件处理程序或调用 stream.resume() 方法时,Readable 流会切换到“流动模式”。其效果是,即使不使用新的 stream.read() 方法和 'readable' 事件,也不必担心丢失 'data' 块。

虽然大多数应用程序将继续正常运行,但这会在以下情况下引入一个边缘情况:

  • 没有添加 'data' 事件监听器。
  • 从未调用 stream.resume() 方法。
  • 流未连接到任何可写的目标。

例如,考虑以下代码:

js
// WARNING!  BROKEN!
net
  .createServer(socket => {
    // 我们添加了一个 'end' 监听器,但从未使用过数据。
    socket.on('end', () => {
      // 它永远不会到达这里。
      socket.end('The message was received but was not processed.\n')
    })
  })
  .listen(1337)

在 Node.js 0.10 之前,传入的消息数据将被简单地丢弃。但是,在 Node.js 0.10 及更高版本中,套接字将永远保持暂停状态。

在这种情况下,解决方法是调用 stream.resume() 方法以开始数据流:

js
// 解决方法。
net
  .createServer(socket => {
    socket.on('end', () => {
      socket.end('The message was received but was not processed.\n')
    })

    // 开始数据流,并丢弃它。
    socket.resume()
  })
  .listen(1337)

除了新的 Readable 流切换到流动模式外,还可以使用 readable.wrap() 方法将 0.10 之前的样式流包装在 Readable 类中。

readable.read(0)

在某些情况下,需要触发底层可读流机制的刷新,而无需实际消耗任何数据。在这种情况下,可以调用 readable.read(0),它将始终返回 null

如果内部读取缓冲区低于 highWaterMark,并且流当前未读取,则调用 stream.read(0) 将触发低级 stream._read() 调用。

虽然大多数应用程序几乎不需要这样做,但在 Node.js 中存在这种情况,尤其是在 Readable 流类内部。

readable.push('')

不建议使用 readable.push('')

向非对象模式的流推送零字节的 <string><Buffer><TypedArray><DataView> 会产生一个有趣的副作用。因为它readable.push() 的调用,所以该调用将结束读取过程。但是,由于参数是空字符串,因此不会向可读缓冲区添加任何数据,因此用户无法消费任何内容。

调用 readable.setEncoding()highWaterMark 的差异

使用 readable.setEncoding() 会改变非对象模式下 highWaterMark 的运行方式。

通常,当前缓冲区的大小是用字节来衡量,并与 highWaterMark 进行比较。但是,在调用 setEncoding() 之后,比较函数将开始用字符来衡量缓冲区的大小。

对于 latin1ascii,这通常不是问题。但是,在处理可能包含多字节字符的字符串时,建议注意这种行为。