流
源代码: lib/stream.js
流是用于在 Node.js 中处理流式数据的抽象接口。node:stream
模块提供了一个实现流接口的 API。
Node.js 提供了许多流对象。例如,对 HTTP 服务器的请求 和 process.stdout
都是流实例。
流可以是可读的、可写的,或者两者都是。所有流都是 EventEmitter
的实例。
要访问 node:stream
模块:
const stream = require('node:stream')
node:stream
模块用于创建新的流实例类型。通常不需要使用 node:stream
模块来使用流。
本文档的组织结构
本文档包含两个主要部分和一个用于备注的第三部分。第一部分解释如何在应用程序中使用现有流。第二部分解释如何创建新的流类型。
流的类型
Node.js 中有四种基本的流类型:
Writable
:可以写入数据的流(例如,fs.createWriteStream()
)。Readable
:可以从中读取数据的流(例如,fs.createReadStream()
)。Duplex
:同时是Readable
和Writable
的流(例如,net.Socket
)。Transform
:可以修改或转换数据在写入和读取过程中的Duplex
流(例如,zlib.createDeflate()
)。
此外,此模块还包括实用程序函数 stream.duplexPair()
、stream.pipeline()
、stream.finished()
、stream.Readable.from()
和 stream.addAbortSignal()
。
流 Promise API
新增于:v15.0.0
stream/promises
API 提供了一套替代的异步实用函数,用于返回 Promise
对象而不是使用回调的流。可以通过 require('node:stream/promises')
或 require('node:stream').promises
访问该 API。
stream.pipeline(source[, ...transforms], destination[, options])
stream.pipeline(streams[, options])
[历史]
版本 | 变更 |
---|---|
v18.0.0, v17.2.0, v16.14.0 | 添加了 end 选项,可以将其设置为 false 以防止在源结束时自动关闭目标流。 |
v15.0.0 | 新增于:v15.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function>- 返回值: <Promise> | <AsyncIterable>
...transforms
<Stream> | <Function>source
<AsyncIterable>- 返回值: <Promise> | <AsyncIterable>
destination
<Stream> | <Function>source
<AsyncIterable>- 返回值: <Promise> | <AsyncIterable>
options
<Object> 管道选项signal
<AbortSignal>end
<boolean> 源流结束时结束目标流。即使此值为false
,转换流也会始终结束。默认值:true
。
返回值: <Promise> 管道完成后完成。
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
console.log('Pipeline succeeded.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('Pipeline succeeded.')
要使用 AbortSignal
,请将其作为最后一个参数放在选项对象中。当信号中断时,将使用 AbortError
调用底层管道的 destroy
方法。
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
const ac = new AbortController()
const signal = ac.signal
setImmediate(() => ac.abort())
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
signal,
})
}
run().catch(console.error) // AbortError
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
console.error(err) // AbortError
}
pipeline
API 也支持异步生成器:
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // 使用字符串而不是 `Buffer`。
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
fs.createWriteStream('uppercase.txt')
)
console.log('Pipeline succeeded.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // 使用字符串而不是 `Buffer`。
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
createWriteStream('uppercase.txt')
)
console.log('Pipeline succeeded.')
请记住处理传递给异步生成器的 signal
参数。尤其是在异步生成器是管道源(即第一个参数)的情况下,否则管道将永远不会完成。
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline succeeded.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline succeeded.')
pipeline
API 提供了 回调版本:
stream.finished(stream[, options])
[历史]
版本 | 变更 |
---|---|
v19.5.0, v18.14.0 | 添加了对 ReadableStream 和 WritableStream 的支持。 |
v19.1.0, v18.13.0 | 添加了 cleanup 选项。 |
v15.0.0 | 首次引入: v15.0.0 |
const { finished } = require('node:stream/promises')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('Stream 完成读取。')
}
run().catch(console.error)
rs.resume() // 耗尽流。
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'
const rs = createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('Stream 完成读取。')
}
run().catch(console.error)
rs.resume() // 耗尽流。
finished
API 还提供了一个 回调版本。
stream.finished()
在返回的 promise 解析或拒绝后会留下悬空事件监听器(特别是 'error'
、'end'
、'finish'
和 'close'
)。这样做的原因是为了防止意外的 'error'
事件(由于不正确的流实现)导致意外崩溃。如果这是不需要的行为,则应将 options.cleanup
设置为 true
:
await finished(rs, { cleanup: true })
对象模式
所有由 Node.js API 创建的流都只操作字符串,<Buffer>,<TypedArray> 和 <DataView> 对象:
字符串
和Buffer
是与流一起使用的最常见类型。TypedArray
和DataView
允许您使用Int32Array
或Uint8Array
等类型处理二进制数据。当您将 TypedArray 或 DataView 写入流时,Node.js 会处理原始字节。
但是,流实现可以使用其他类型的 JavaScript 值(null
除外,它在流中具有特殊用途)。此类流被认为是在“对象模式”下运行。
在创建流时,使用 objectMode
选项可以将流实例切换到对象模式。尝试将现有流切换到对象模式是不安全的。
缓冲
Writable
和 Readable
流都会将数据存储在内部缓冲区中。
潜在的缓冲数据量取决于传递给流构造函数的 highWaterMark
选项。对于普通流,highWaterMark
选项指定总字节数。对于以对象模式运行的流,highWaterMark
指定对象的总数。对于处理(但不解码)字符串的流,highWaterMark
指定 UTF-16 代码单元的总数。
当实现调用 stream.push(chunk)
时,数据会在 Readable
流中缓冲。如果流的使用者没有调用 stream.read()
,则数据将保留在内部队列中,直到被使用。
一旦内部读取缓冲区的总大小达到 highWaterMark
指定的阈值,流将暂时停止从底层资源读取数据,直到当前缓冲的数据被使用(也就是说,流将停止调用用于填充读取缓冲区的内部 readable._read()
方法)。
当重复调用 writable.write(chunk)
方法时,数据会在 Writable
流中缓冲。当内部写入缓冲区的总大小低于 highWaterMark
设置的阈值时,对 writable.write()
的调用将返回 true
。一旦内部缓冲区的大小达到或超过 highWaterMark
,则返回 false
。
stream
API,特别是 stream.pipe()
方法的一个主要目标是将数据的缓冲限制在可接受的水平,这样速度不同的源和目标就不会压垮可用内存。
highWaterMark
选项是一个阈值,而不是限制:它决定了流在停止请求更多数据之前缓冲的数据量。它通常不强制执行严格的内存限制。特定的流实现可以选择强制执行更严格的限制,但这只是可选的。
因为 Duplex
和 Transform
流同时是 Readable
和 Writable
,所以每个流维护两个独立的内部缓冲区用于读取和写入,允许每一侧独立于另一侧操作,同时保持适当和高效的数据流。例如,net.Socket
实例是 Duplex
流,其 Readable
端允许使用从套接字接收的数据,其 Writable
端允许向套接字写入数据。因为数据可能以比接收数据快或慢的速度写入套接字,所以每一侧都应该独立于另一侧操作(和缓冲)。
内部缓冲的机制是内部实现细节,可能会随时更改。但是,对于某些高级实现,可以使用 writable.writableBuffer
或 readable.readableBuffer
获取内部缓冲区。不建议使用这些未公开的属性。
流消费者 API
几乎所有 Node.js 应用程序,无论多么简单,都以某种方式使用流。以下是在 Node.js 应用程序中使用流的示例,该应用程序实现了一个 HTTP 服务器:
const http = require('node:http')
const server = http.createServer((req, res) => {
// `req` 是一个 http.IncomingMessage,它是一个可读流。
// `res` 是一个 http.ServerResponse,它是一个可写流。
let body = ''
// 获取数据作为 utf8 字符串。
// 如果未设置编码,则将接收 Buffer 对象。
req.setEncoding('utf8')
// 可读流在添加侦听器后会发出 'data' 事件。
req.on('data', chunk => {
body += chunk
})
// 'end' 事件表示已接收整个主体。
req.on('end', () => {
try {
const data = JSON.parse(body)
// 向用户写回一些有趣的内容:
res.write(typeof data)
res.end()
} catch (er) {
// 糟糕!无效的 JSON!
res.statusCode = 400
return res.end(`error: ${er.message}`)
}
})
})
server.listen(1337)
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON
Writable
流(例如示例中的 res
)公开诸如 write()
和 end()
之类的方法,这些方法用于将数据写入流。
Readable
流使用 EventEmitter
API 来通知应用程序代码何时可以从流中读取数据。可以使用多种方法从流中读取可用数据。
Writable
和 Readable
流都使用 EventEmitter
API 以各种方式来传达流的当前状态。
Duplex
和 Transform
流同时是 Writable
和 Readable
。
写入数据到流或从流中使用数据的应用程序不需要直接实现流接口,并且通常没有理由调用 require('node:stream')
。
希望实现新型流的开发者应该参考流实现者 API 部分。
可写流
可写流是对数据的目标写入位置的抽象。
Writable
流的示例包括:
其中一些示例实际上是实现了 Writable
接口的 Duplex
流。
所有 Writable
流都实现了 stream.Writable
类定义的接口。
虽然 Writable
流的具体实例可能在各个方面有所不同,但所有 Writable
流都遵循与以下示例中所示相同的根本使用模式:
const myStream = getWritableStreamSomehow()
myStream.write('some data')
myStream.write('some more data')
myStream.end('done writing data')
类: stream.Writable
添加到: v0.9.4
事件: 'close'
[历史]
版本 | 更改 |
---|---|
v10.0.0 | 添加 emitClose 选项以指定是否在销毁时发出 'close' 。 |
v0.9.4 | 添加到: v0.9.4 |
当流及其任何底层资源(例如文件描述符)关闭时,将发出 'close'
事件。该事件表示不会再发出任何事件,也不会发生进一步的计算。
如果 Writable
流使用 emitClose
选项创建,它将始终发出 'close'
事件。
事件: 'drain'
添加到: v0.9.4
如果对 stream.write(chunk)
的调用返回 false
,则当适合恢复向流写入数据时,将发出 'drain'
事件。
// 将数据写入提供的可写流一百万次。
// 注意背压。
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000
write()
function write() {
let ok = true
do {
i--
if (i === 0) {
// 最后一次!
writer.write(data, encoding, callback)
} else {
// 查看我们是否应该继续,或者等待。
// 不要传递回调,因为我们还没有完成。
ok = writer.write(data, encoding)
}
} while (i > 0 && ok)
if (i > 0) {
// 必须提前停止!
// 一旦它排空,就再写一些。
writer.once('drain', write)
}
}
}
事件: 'error'
添加到: v0.9.4
如果在写入或管道传输数据时发生错误,则会发出 'error'
事件。当被调用时,监听器回调将传递一个单一的 Error
参数。
除非在创建流时将 autoDestroy
选项设置为 false
,否则在发出 'error'
事件时流将关闭。
在 'error'
之后,不应发出除 'close'
之外的其他事件(包括 'error'
事件)。
事件: 'finish'
添加到: v0.9.4
在调用 stream.end()
方法并将所有数据刷新到底层系统之后,将发出 'finish'
事件。
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`)
}
writer.on('finish', () => {
console.log('所有写入现在都已完成。')
})
writer.end('这是结尾\n')
事件: 'pipe'
添加到: v0.9.4
src
<stream.Readable> 将数据管道传输到此可写流的源流
当在可读流上调用 stream.pipe()
方法时,会发出 'pipe'
事件,将此可写流添加到其目标集中。
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
console.log('某些东西正在管道传输到写入器中。')
assert.equal(src, reader)
})
reader.pipe(writer)
事件: 'unpipe'
添加到: v0.9.4
src
<stream.Readable> 取消管道传输此可写流的源流
当在 Readable
流上调用 stream.unpipe()
方法时,会发出 'unpipe'
事件,将其从其目标集中移除此 Writable
。
如果此 Writable
流在 Readable
流管道传输到它时发出错误,也会发出此事件。
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
console.log('某些东西已停止管道传输到写入器中。')
assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()
添加到: v0.11.2
writable.cork()
方法强制将所有写入的数据缓冲在内存中。缓冲的数据将在调用 stream.uncork()
或 stream.end()
方法时刷新。
writable.cork()
的主要目的是适应这种情况:几个小的块快速连续地写入流。writable.cork()
不会立即将它们转发到底层目标,而是缓冲所有块,直到调用 writable.uncork()
,这将把它们全部传递给 writable._writev()
(如果存在)。这可以防止头部阻塞的情况,在这种情况下,数据在等待处理第一个小块时正在被缓冲。但是,如果不实现 writable._writev()
就使用 writable.cork()
可能会对吞吐量产生不利影响。
另请参阅:writable.uncork()
、writable._writev()
。
writable.destroy([error])
[历史]
版本 | 更改 |
---|---|
v14.0.0 | 在已销毁的流上作为无操作工作。 |
v8.0.0 | 添加到: v8.0.0 |
销毁流。可选地发出 'error'
事件,并发出 'close'
事件(除非 emitClose
设置为 false
)。在此调用之后,可写流已结束,后续对 write()
或 end()
的调用将导致 ERR_STREAM_DESTROYED
错误。这是一种破坏性和立即销毁流的方式。之前对 write()
的调用可能尚未排空,并可能触发 ERR_STREAM_DESTROYED
错误。如果数据应在关闭之前刷新,请使用 end()
代替 destroy,或者在销毁流之前等待 'drain'
事件。
const { Writable } = require('node:stream')
const myStream = new Writable()
const fooErr = new Error('foo error')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo error
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.on('error', function wontHappen() {})
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED
一旦调用了 destroy()
,任何进一步的调用都将是无操作的,并且除了来自 _destroy()
的错误之外,不会发出任何其他错误作为 'error'
。
实现者不应覆盖此方法,而应实现 writable._destroy()
。
writable.closed
添加到: v18.0.0
在发出 'close'
后为 true
。
writable.destroyed
添加到: v8.0.0
在调用 writable.destroy()
后为 true
。
const { Writable } = require('node:stream')
const myStream = new Writable()
console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])
[历史]
版本 | 更改 |
---|---|
v22.0.0, v20.13.0 | chunk 参数现在可以是 TypedArray 或 DataView 实例。 |
v15.0.0 | callback 在 'finish' 之前或发生错误时被调用。 |
v14.0.0 | 如果发出 'finish' 或 'error',则调用 callback 。 |
v10.0.0 | 此方法现在返回对 writable 的引用。 |
v8.0.0 | chunk 参数现在可以是 Uint8Array 实例。 |
v0.9.4 | 添加到: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> 可选数据以写入。对于不在对象模式下运行的流,chunk
必须是 <string>、<Buffer>、<TypedArray> 或 <DataView>。对于对象模式流,chunk
可以是除null
之外的任何 JavaScript 值。encoding
<string> 如果chunk
是字符串,则为编码callback
<Function> 流完成时的回调。- 返回: <this>
调用 writable.end()
方法表示不会再向 Writable
写入数据。可选的 chunk
和 encoding
参数允许在关闭流之前立即写入最后一部分附加数据。
在调用 stream.end()
之后调用 stream.write()
方法将引发错误。
// 写入 'hello, ' 然后以 'world!' 结尾。
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hello, ')
file.end('world!')
// 现在不允许再写入!
writable.setDefaultEncoding(encoding)
[历史]
版本 | 更改 |
---|---|
v6.1.0 | 此方法现在返回对 writable 的引用。 |
v0.11.15 | 添加到: v0.11.15 |
writable.setDefaultEncoding()
方法设置 Writable
流的默认 encoding
。
writable.uncork()
添加到: v0.11.2
writable.uncork()
方法刷新自调用 stream.cork()
以来缓冲的所有数据。
当使用 writable.cork()
和 writable.uncork()
来管理对流的写入缓冲时,使用 process.nextTick()
推迟对 writable.uncork()
的调用。这样做允许在给定的 Node.js 事件循环阶段对所有 writable.write()
调用进行批处理。
stream.cork()
stream.write('some ')
stream.write('data ')
process.nextTick(() => stream.uncork())
如果在流上多次调用 writable.cork()
方法,则必须调用相同数量的 writable.uncork()
调用才能刷新缓冲的数据。
stream.cork()
stream.write('some ')
stream.cork()
stream.write('data ')
process.nextTick(() => {
stream.uncork()
// 数据在第二次调用 uncork() 之前不会被刷新。
stream.uncork()
})
另请参阅:writable.cork()
。
writable.writable
添加到: v11.4.0
如果可以安全地调用 writable.write()
,则为 true
,这意味着流尚未被销毁、出错或结束。
writable.writableAborted
添加到: v18.0.0, v16.17.0
返回在发出 'finish'
之前流是否被销毁或出错。
writable.writableEnded
添加到: v12.9.0
在调用 writable.end()
之后为 true
。此属性并不表示数据是否已被刷新,为此请使用 writable.writableFinished
。
writable.writableCorked
添加到: v13.2.0, v12.16.0
需要调用 writable.uncork()
的次数才能完全取消流的 cork。
writable.errored
添加到: v18.0.0
如果流已使用错误被销毁,则返回错误。
writable.writableFinished
添加到: v12.6.0
在发出 'finish'
事件之前立即设置为 true
。
writable.writableHighWaterMark
添加到: v9.3.0
返回创建此 Writable
时传递的 highWaterMark
的值。
writable.writableLength
添加到: v9.4.0
此属性包含队列中准备写入的字节(或对象)数。该值提供有关 highWaterMark
状态的内省数据。
writable.writableNeedDrain
添加到: v15.2.0, v14.17.0
如果流的缓冲区已满并且流将发出 'drain'
,则为 true
。
writable.writableObjectMode
添加到: v12.3.0
给定 Writable
流的属性 objectMode
的 Getter。
writable[Symbol.asyncDispose]()
添加到: v22.4.0, v20.16.0
使用 AbortError
调用 writable.destroy()
,并返回一个在流完成时完成的 promise。
writable.write(chunk[, encoding][, callback])
[历史]
版本 | 更改 |
---|---|
v22.0.0, v20.13.0 | chunk 参数现在可以是 TypedArray 或 DataView 实例。 |
v8.0.0 | chunk 参数现在可以是 Uint8Array 实例。 |
v6.0.0 | 现在将 null 作为 chunk 参数传递始终被认为无效,即使在对象模式下也是如此。 |
v0.9.4 | 添加到: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> 可选数据以写入。对于不在对象模式下运行的流,chunk
必须是 <string>、<Buffer>、<TypedArray> 或 <DataView>。对于对象模式流,chunk
可以是除null
之外的任何 JavaScript 值。encoding
<string> | <null> 如果chunk
是字符串,则为编码。默认:'utf8'
callback
<Function> 此数据块刷新时的回调。- 返回: <boolean> 如果流希望调用代码等待发出
'drain'
事件后再继续写入附加数据,则为false
;否则为true
。
writable.write()
方法将一些数据写入流,并在数据完全处理后调用提供的 callback
。如果发生错误,则 callback
将以错误作为其第一个参数被调用。callback
是异步调用的,并且在发出 'error'
之前调用。
如果在接纳 chunk
后内部缓冲区小于创建流时配置的 highWaterMark
,则返回值为 true
。如果返回 false
,则应停止进一步尝试向流写入数据,直到发出 'drain'
事件。如果 write()
返回 false,请不要在发出 'drain'
事件之前写入更多块。虽然允许在流未排空时调用 write()
,但 Node.js 将缓冲所有写入的块,直到达到最大内存使用量,此时它将无条件中止。即使在它中止之前,高内存使用率也会导致垃圾收集器性能下降和高 RSS(即使不再需要内存,通常也不会释放回系统)。由于如果远程对等方没有读取数据,TCP 套接字可能永远不会排空,因此写入未排空的套接字可能会导致可远程利用的漏洞。
在流未排空时写入数据对于 Transform
来说尤其成问题,因为 Transform
流默认情况下会暂停,直到它们被管道传输或添加 'data'
或 'readable'
事件处理程序。
如果要写入的数据可以按需生成或获取,建议将其逻辑封装到 Readable
中并使用 stream.pipe()
。但是,如果更喜欢调用 write()
,则可以使用 'drain'
事件来尊重背压并避免内存问题:
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb)
} else {
process.nextTick(cb)
}
}
// 等待 cb 被调用后再进行任何其他写入。
write('hello', () => {
console.log('写入已完成,现在可以进行更多写入。')
})
处于对象模式的可写流将始终忽略 encoding
参数。
可读流
可读流是对数据的来源的抽象,从中可以消费数据。
Readable
流的示例包括:
所有 Readable
流都实现了 stream.Readable
类定义的接口。
两种读取模式
Readable
流实际上运行在两种模式之一:流式和暂停。这些模式与对象模式不同。Readable
流可以处于对象模式或非对象模式,而不管它是否处于流式模式或暂停模式。
- 在流式模式下,数据会自动从底层系统读取,并尽可能快地通过
EventEmitter
接口的事件提供给应用程序。 - 在暂停模式下,必须显式调用
stream.read()
方法从流中读取数据块。
所有 Readable
流都以暂停模式开始,但可以通过以下几种方式切换到流式模式:
- 添加
'data'
事件处理程序。 - 调用
stream.resume()
方法。 - 调用
stream.pipe()
方法将数据发送到Writable
。
Readable
可以使用以下方法之一切换回暂停模式:
- 如果没有管道目标,则调用
stream.pause()
方法。 - 如果有管道目标,则移除所有管道目标。可以通过调用
stream.unpipe()
方法移除多个管道目标。
需要记住的重要概念是,Readable
不会生成数据,除非提供了一种消费或忽略该数据的机制。如果消费机制被禁用或移除,Readable
将尝试停止生成数据。
出于向后兼容性的原因,移除 'data'
事件处理程序不会自动暂停流。此外,如果存在管道目标,则调用 stream.pause()
并不能保证流在这些目标清空并请求更多数据后会保持暂停状态。
如果将 Readable
切换到流式模式并且没有可用的使用者来处理数据,则该数据将丢失。例如,当调用 readable.resume()
方法时没有附加到 'data'
事件的侦听器,或者当从流中移除 'data'
事件处理程序时,就会发生这种情况。
添加 'readable'
事件处理程序会自动使流停止流动,并且必须通过 readable.read()
来使用数据。如果移除 'readable'
事件处理程序,则如果存在 'data'
事件处理程序,流将再次开始流动。
三种状态
Readable
流的“两种模式”是对 Readable
流内部实现中更复杂的内部状态管理的简化抽象。
具体来说,在任何给定时间点,每个 Readable
都处于三种可能状态之一:
readable.readableFlowing === null
readable.readableFlowing === false
readable.readableFlowing === true
当 readable.readableFlowing
为 null
时,没有提供消耗流数据的机制。因此,流不会生成数据。在此状态下,附加 'data'
事件的监听器,调用 readable.pipe()
方法或调用 readable.resume()
方法将把 readable.readableFlowing
切换为 true
,导致 Readable
开始主动发出数据生成的事件。
调用 readable.pause()
、readable.unpipe()
或接收背压将导致 readable.readableFlowing
设置为 false
,暂时停止事件流,但不会停止数据生成。在此状态下,附加 'data'
事件的监听器不会将 readable.readableFlowing
切换为 true
。
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()
pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing 现在为 false。
pass.on('data', chunk => {
console.log(chunk.toString())
})
// readableFlowing 仍然为 false。
pass.write('ok') // 不会发出 'data' 事件。
pass.resume() // 必须调用此方法才能使流发出 'data' 事件。
// readableFlowing 现在为 true。
当 readable.readableFlowing
为 false
时,数据可能会累积在流的内部缓冲区中。
选择一种 API 风格
Readable
流 API 在多个 Node.js 版本中不断发展,并提供多种方法来消费流数据。一般来说,开发者应该选择 一种 数据消费方法,并且绝不应该使用多种方法来从单个流中消费数据。具体来说,结合使用 on('data')
、on('readable')
、pipe()
或异步迭代器可能会导致难以预料的行为。
类: stream.Readable
添加到:v0.9.4
事件: 'close'
[历史]
版本 | 更改 |
---|---|
v10.0.0 | 添加 emitClose 选项以指定是否在销毁时发出 'close' 。 |
v0.9.4 | 添加到:v0.9.4 |
当流及其任何底层资源(例如文件描述符)关闭时,将发出 'close'
事件。该事件表示不会再发出任何事件,也不会发生进一步的计算。
如果使用 emitClose
选项创建 Readable
流,则该流将始终发出 'close'
事件。
事件: 'data'
添加到:v0.9.4
chunk
<Buffer> | <string> | <any> 数据块。对于非对象模式操作的流,数据块将是字符串或Buffer
。对于处于对象模式的流,数据块可以是任何 JavaScript 值,但null
除外。
每当流将数据块的所有权交给消费者时,都会发出 'data'
事件。这可能发生在通过调用 readable.pipe()
、readable.resume()
或将监听器回调附加到 'data'
事件时,流以流式模式切换时。每当调用 readable.read()
方法并且有可返回的数据块时,也会发出 'data'
事件。
将 'data'
事件监听器附加到尚未显式暂停的流将使流进入流式模式。然后,数据将在可用时立即传递。
如果使用 readable.setEncoding()
方法为流指定了默认编码,则监听器回调将把数据块作为字符串传递;否则,数据将作为 Buffer
传递。
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Received ${chunk.length} bytes of data.`)
})
事件: 'end'
添加到:v0.9.4
当流中没有更多数据可供使用时,将发出 'end'
事件。
除非数据完全被使用,否则不会发出 'end'
事件。这可以通过将流切换到流式模式,或者重复调用 stream.read()
直到所有数据都被使用来实现。
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Received ${chunk.length} bytes of data.`)
})
readable.on('end', () => {
console.log('There will be no more data.')
})
事件: 'error'
添加到:v0.9.4
Readable
实现随时可能发出 'error'
事件。通常,如果底层流由于底层内部故障而无法生成数据,或者当流实现尝试推送无效的数据块时,可能会发生这种情况。
监听器回调将传递单个 Error
对象。
事件: 'pause'
添加到:v0.9.4
当调用 stream.pause()
且 readableFlowing
不为 false
时,将发出 'pause'
事件。
事件: 'readable'
[历史]
版本 | 更改 |
---|---|
v10.0.0 | 'readable' 始终在调用 .push() 后的下一个 tick 中发出。 |
v10.0.0 | 使用 'readable' 需要调用 .read() 。 |
v0.9.4 | 添加到:v0.9.4 |
当有数据可供从流中读取时,最多可达配置的高水位线 (state.highWaterMark
),将发出 'readable'
事件。实际上,它表示流在缓冲区中具有新信息。如果缓冲区中有可用数据,则可以调用 stream.read()
来检索该数据。此外,当流的末尾到达时,也可能会发出 'readable'
事件。
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
// There is some data to read now.
let data
while ((data = this.read()) !== null) {
console.log(data)
}
})
如果流的末尾已到达,则调用 stream.read()
将返回 null
并触发 'end'
事件。如果从未有任何数据可供读取,这也是正确的。例如,在以下示例中,foo.txt
是一个空文件:
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
console.log('end')
})
运行此脚本的输出为:
$ node test.js
readable: null
end
在某些情况下,附加 'readable'
事件的监听器将导致将一定数量的数据读取到内部缓冲区中。
一般来说,readable.pipe()
和 'data'
事件机制比 'readable'
事件更容易理解。但是,处理 'readable'
可能会提高吞吐量。
如果同时使用 'readable'
和 'data'
,则 'readable'
优先控制流,即只有在调用 stream.read()
时才会发出 'data'
。readableFlowing
属性将变为 false
。如果在删除 'readable'
时存在 'data'
监听器,则流将开始流动,即 'data'
事件将在不调用 .resume()
的情况下发出。
事件: 'resume'
添加到:v0.9.4
当调用 stream.resume()
且 readableFlowing
不为 true
时,将发出 'resume'
事件。
readable.destroy([error])
[历史]
版本 | 更改 |
---|---|
v14.0.0 | 在已销毁的流上充当空操作。 |
v8.0.0 | 添加到:v8.0.0 |
销毁流。可以选择发出 'error'
事件,并发出 'close'
事件(除非 emitClose
设置为 false
)。此调用之后,可读流将释放任何内部资源,后续对 push()
的调用将被忽略。
一旦调用了 destroy()
,任何进一步的调用都将是空操作,并且除了来自 _destroy()
的错误之外,不会再发出任何错误作为 'error'
。
实现者不应覆盖此方法,而应实现 readable._destroy()
。
readable.closed
添加到:v18.0.0
在发出 'close'
后为 true
。
readable.destroyed
添加到:v8.0.0
调用 readable.destroy()
后为 true
。
readable.isPaused()
添加到:v0.11.14
- 返回: <boolean>
readable.isPaused()
方法返回 Readable
的当前运行状态。这主要由 readable.pipe()
方法的基础机制使用。在大多数典型情况下,没有理由直接使用此方法。
const readable = new stream.Readable()
readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()
添加到:v0.9.4
- 返回: <this>
readable.pause()
方法将导致处于流式模式的流停止发出 'data'
事件,从而退出流式模式。任何可用数据都将保留在内部缓冲区中。
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Received ${chunk.length} bytes of data.`)
readable.pause()
console.log('There will be no additional data for 1 second.')
setTimeout(() => {
console.log('Now data will start flowing again.')
readable.resume()
}, 1000)
})
如果存在 'readable'
事件监听器,则 readable.pause()
方法无效。
readable.pipe(destination[, options])
添加到:v0.9.4
destination
<stream.Writable> 用于写入数据的目标options
<Object> 管道选项end
<boolean> 读取器结束时结束写入器。默认值:true
。
返回: <stream.Writable> 目标,如果它是
Duplex
或Transform
流,则允许管道链
readable.pipe()
方法将 Writable
流附加到 readable
,使其自动切换到流式模式并将所有数据推送到附加的 Writable
。数据流将自动管理,这样目标 Writable
流不会被更快的 Readable
流淹没。
以下示例将 readable
中的所有数据写入名为 file.txt
的文件中:
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// All the data from readable goes into 'file.txt'.
readable.pipe(writable)
可以将多个 Writable
流附加到单个 Readable
流。
readable.pipe()
方法返回对 目标 流的引用,从而可以设置管道流的链:
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)
默认情况下,当源 Readable
流发出 'end'
时,会在目标 Writable
流上调用 stream.end()
,以便目标不再可写。要禁用此默认行为,可以将 end
选项传递为 false
,使目标流保持打开状态:
reader.pipe(writer, { end: false })
reader.on('end', () => {
writer.end('Goodbye\n')
})
一个重要的警告是,如果 Readable
流在处理过程中发出错误,则目标 Writable
不会自动关闭。如果发生错误,则必须手动关闭每个流才能防止内存泄漏。
process.stderr
和 process.stdout
Writable
流直到 Node.js 进程退出才关闭,而不管指定的选项如何。
readable.read([size])
添加到:v0.9.4
readable.read()
方法从内部缓冲区读取数据并将其返回。如果没有可用数据可供读取,则返回 null
。默认情况下,数据以 Buffer
对象的形式返回,除非使用 readable.setEncoding()
方法指定了编码,或者流以对象模式运行。
可选的 size
参数指定要读取的特定字节数。如果无法读取 size
字节,则返回 null
,除非流已结束,在这种情况下,将返回内部缓冲区中剩余的所有数据。
如果未指定 size
参数,则将返回内部缓冲区中包含的所有数据。
size
参数必须小于或等于 1 GiB。
readable.read()
方法只能在暂停模式下运行的 Readable
流上调用。在流式模式下,readable.read()
会自动调用,直到内部缓冲区完全清空。
const readable = getReadableStreamSomehow()
// 'readable' may be triggered multiple times as data is buffered in
readable.on('readable', () => {
let chunk
console.log('Stream is readable (new data received in buffer)')
// Use a loop to make sure we read all currently available data
while (null !== (chunk = readable.read())) {
console.log(`Read ${chunk.length} bytes of data...`)
}
})
// 'end' will be triggered once when there is no more data available
readable.on('end', () => {
console.log('Reached end of stream.')
})
每次调用 readable.read()
都返回一个数据块或 null
,表示此时没有更多数据可读。这些块不会自动连接。因为单个 read()
调用不会返回所有数据,所以可能需要使用 while 循环来连续读取块,直到检索所有数据。读取大型文件时,.read()
可能会暂时返回 null
,这表示它已使用所有缓冲内容,但可能还有更多数据有待缓冲。在这种情况下,一旦缓冲区中有更多数据,就会发出新的 'readable'
事件,'end'
事件表示数据传输结束。
因此,要从 readable
中读取文件的全部内容,需要在多个 'readable'
事件中收集块:
const chunks = []
readable.on('readable', () => {
let chunk
while (null !== (chunk = readable.read())) {
chunks.push(chunk)
}
})
readable.on('end', () => {
const content = chunks.join('')
})
处于对象模式的 Readable
流将始终从对 readable.read(size)
的调用中返回单个项目,而不管 size
参数的值如何。
如果 readable.read()
方法返回数据块,则还会发出 'data'
事件。
在发出 'end'
事件后调用 stream.read([size])
将返回 null
。不会引发运行时错误。
readable.readable
添加到:v11.4.0
如果可以安全地调用 readable.read()
,则为 true
,这意味着流尚未销毁或发出 'error'
或 'end'
。
readable.readableAborted
添加到:v16.8.0
返回在发出 'end'
之前流是否已销毁或出错。
readable.readableDidRead
添加到:v16.7.0, v14.18.0
返回是否已发出 'data'
。
readable.readableEncoding
添加到:v12.7.0
给定 Readable
流的属性 encoding
的 Getter。可以使用 readable.setEncoding()
方法设置 encoding
属性。
readable.readableEnded
添加到:v12.9.0
发出 'end'
事件时变为 true
。
readable.errored
添加到:v18.0.0
如果流已使用错误销毁,则返回错误。
readable.readableFlowing
添加到:v9.4.0
此属性反映了 三种状态 部分中描述的 Readable
流的当前状态。
readable.readableHighWaterMark
添加到:v9.3.0
返回创建此 Readable
时传递的 highWaterMark
值。
readable.readableLength
添加到:v9.4.0
此属性包含队列中准备读取的字节数(或对象数)。该值提供有关 highWaterMark
状态的内省数据。
readable.readableObjectMode
添加到:v12.3.0
给定 Readable
流的属性 objectMode
的 Getter。
readable.resume()
[历史]
版本 | 更改 |
---|---|
v10.0.0 | 如果存在 'readable' 事件监听器,则 resume() 无效。 |
v0.9.4 | 添加到:v0.9.4 |
- 返回: <this>
readable.resume()
方法使显式暂停的 Readable
流恢复发出 'data'
事件,并将流切换到流式模式。
readable.resume()
方法可用于完全使用流中的数据,而无需实际处理任何数据:
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Reached the end, but did not read anything.')
})
如果存在 'readable'
事件监听器,则 readable.resume()
方法无效。
readable.setEncoding(encoding)
添加到:v0.9.4
readable.setEncoding()
方法设置从 Readable
流读取数据的字符编码。
默认情况下,不分配任何编码,流数据将作为 Buffer
对象返回。设置编码会导致流数据作为指定编码的字符串而不是 Buffer
对象返回。例如,调用 readable.setEncoding('utf8')
将使输出数据被解释为 UTF-8 数据,并作为字符串传递。调用 readable.setEncoding('hex')
将使数据以十六进制字符串格式编码。
Readable
流将正确处理通过流传递的多字节字符,否则如果只是从流中作为 Buffer
对象提取,这些字符将被解码不正确。
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
assert.equal(typeof chunk, 'string')
console.log('Got %d characters of string data:', chunk.length)
})
readable.unpipe([destination])
添加到:v0.9.4
destination
<stream.Writable> 要取消管道的特定流(可选)- 返回: <this>
readable.unpipe()
方法分离先前使用 stream.pipe()
方法附加的 Writable
流。
如果未指定 destination
,则分离所有管道。
如果指定了 destination
,但没有为其设置管道,则该方法不执行任何操作。
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// All the data from readable goes into 'file.txt',
// but only for the first second.
readable.pipe(writable)
setTimeout(() => {
console.log('Stop writing to file.txt.')
readable.unpipe(writable)
console.log('Manually close the file stream.')
writable.end()
}, 1000)
readable.unshift(chunk[, encoding])
[历史]
版本 | 更改 |
---|---|
v22.0.0, v20.13.0 | chunk 参数现在可以是 TypedArray 或 DataView 实例。 |
v8.0.0 | chunk 参数现在可以是 Uint8Array 实例。 |
v0.9.11 | 添加到:v0.9.11 |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> 要移入读取队列的数据块。对于非对象模式操作的流,chunk
必须是 <string>、<Buffer>、<TypedArray>、<DataView> 或null
。对于对象模式流,chunk
可以是任何 JavaScript 值。encoding
<string> 字符串块的编码。必须是有效的Buffer
编码,例如'utf8'
或'ascii'
。
将 chunk
作为 null
传递表示流的结尾 (EOF),其行为与 readable.push(null)
相同,之后无法再写入任何数据。EOF 信号位于缓冲区的末尾,任何缓冲数据仍将被刷新。
readable.unshift()
方法将数据块推回到内部缓冲区中。这在某些情况下非常有用,在这些情况下,流由需要“取消使用”它已乐观地从源中提取的数据量的代码使用,以便可以将数据传递给其他方。
在发出 'end'
事件后,不能调用 stream.unshift(chunk)
方法,否则将引发运行时错误。
使用 stream.unshift()
的开发人员通常应该考虑改用 Transform
流。有关更多信息,请参阅 流实现者的 API 部分。
// Pull off a header delimited by \n\n.
// Use unshift() if we get too much.
// Call the callback with (error, header, stream).
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
stream.on('error', callback)
stream.on('readable', onReadable)
const decoder = new StringDecoder('utf8')
let header = ''
function onReadable() {
let chunk
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk)
if (str.includes('\n\n')) {
// Found the header boundary.
const split = str.split(/\n\n/)
header += split.shift()
const remaining = split.join('\n\n')
const buf = Buffer.from(remaining, 'utf8')
stream.removeListener('error', callback)
// Remove the 'readable' listener before unshifting.
stream.removeListener('readable', onReadable)
if (buf.length) stream.unshift(buf)
// Now the body of the message can be read from the stream.
callback(null, header, stream)
return
}
// Still reading the header.
header += str
}
}
}
与 stream.push(chunk)
不同,stream.unshift(chunk)
不会通过重置流的内部读取状态来结束读取过程。如果在读取过程中(即在自定义流上的 stream._read()
实现中)调用 readable.unshift()
,这可能会导致意外的结果。在调用 readable.unshift()
后立即使用 stream.push('')
将适当地重置读取状态,但是最好是在执行读取的过程中避免调用 readable.unshift()
。
readable.wrap(stream)
添加到:v0.9.4
在 Node.js 0.10 之前,流并未实现当前定义的整个 node:stream
模块 API。(有关更多信息,请参阅 兼容性)。
当使用发出 'data'
事件并且仅具有建议性的 stream.pause()
方法的旧版 Node.js 库时,可以使用 readable.wrap()
方法创建一个使用旧流作为其数据源的 Readable
流。
很少需要使用 readable.wrap()
,但该方法已作为与旧版 Node.js 应用程序和库交互的便利功能提供。
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)
myReader.on('readable', () => {
myReader.read() // etc.
})
readable[Symbol.asyncIterator]()
[历史]
版本 | 更改 |
---|---|
v11.14.0 | Symbol.asyncIterator 支持不再是实验性的。 |
v10.0.0 | 添加到:v10.0.0 |
- 返回: 用于完全使用流的 <AsyncIterator>。
const fs = require('node:fs')
async function print(readable) {
readable.setEncoding('utf8')
let data = ''
for await (const chunk of readable) {
data += chunk
}
console.log(data)
}
print(fs.createReadStream('file')).catch(console.error)
如果循环使用 break
、return
或 throw
终止,则流将被销毁。换句话说,遍历流将完全使用流。流将以等于 highWaterMark
选项的大小分块读取。在上面的代码示例中,如果文件的数据少于 64 KiB,则数据将位于单个块中,因为没有向 fs.createReadStream()
提供 highWaterMark
选项。
readable[Symbol.asyncDispose]()
添加到:v20.4.0, v18.18.0
使用 AbortError
调用 readable.destroy()
,并返回一个在流完成时完成的 promise。
readable.compose(stream[, options])
添加到:v19.1.0, v18.13.0
stream
<Stream> | <Iterable> | <AsyncIterable> | <Function>options
<Object>signal
<AbortSignal> 如果信号被中止,则允许销毁流。
返回: 使用流
stream
组合的 <Duplex> 流。
import { Readable } from 'node:stream'
async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ')
for (const word of words) {
yield word
}
}
}
const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords)
const words = await wordsStream.toArray()
console.log(words) // prints ['this', 'is', 'compose', 'as', 'operator']
有关更多信息,请参阅 stream.compose
。
readable.iterator([options])
**添加到:
双工流和转换流
类:stream.Duplex
[历史]
版本 | 变更 |
---|---|
v6.8.0 | Duplex 的实例在检查 instanceof stream.Writable 时现在返回 true 。 |
v0.9.4 | 新增于:v0.9.4 |
双工流是同时实现 Readable
和 Writable
接口的流。
Duplex
流的示例包括:
duplex.allowHalfOpen
新增于:v0.9.4
如果为 false
,则当可读端结束时,流将自动结束可写端。最初由 allowHalfOpen
构造函数选项设置,默认为 true
。
可以手动更改此选项来更改现有 Duplex
流实例的半打开行为,但必须在发出 'end'
事件之前更改。
类: stream.Transform
添加到: v0.9.4
转换流是 Duplex
流,其中输出与输入以某种方式相关。与所有 Duplex
流一样,Transform
流同时实现了 Readable
和 Writable
接口。
Transform
流的示例包括:
transform.destroy([error])
[历史]
版本 | 更改 |
---|---|
v14.0.0 | 在已销毁的流上作为无操作工作。 |
v8.0.0 | 添加到:v8.0.0 |
销毁流,并可选地发出 'error'
事件。此调用之后,转换流将释放任何内部资源。实现者不应覆盖此方法,而应实现 readable._destroy()
。Transform
的 _destroy()
的默认实现也会发出 'close'
,除非 emitClose
设置为 false。
一旦调用了 destroy()
,任何进一步的调用都将是无操作的,并且除了 _destroy()
之外,不会发出任何其他错误作为 'error'
。
stream.duplexPair([options])
新增于:v22.6.0, v20.17.0
实用函数 duplexPair
返回一个包含两个项目的数组,每个项目都是一个与另一端连接的 Duplex
流:
const [sideA, sideB] = duplexPair()
写入一个流的内容在另一个流中变为可读。它提供了类似于网络连接的行为,客户端写入的数据可以被服务器读取,反之亦然。
Duplex 流是对称的;可以使用其中一个,而不会有任何行为差异。
stream.finished(stream[, options], callback)
[历史]
版本 | 变更 |
---|---|
v19.5.0 | 添加了对 ReadableStream 和 WritableStream 的支持。 |
v15.11.0 | 添加了 signal 选项。 |
v14.0.0 | finished(stream, cb) 将在调用回调之前等待 'close' 事件。实现尝试检测旧版流,并且仅将此行为应用于预期会发出 'close' 的流。 |
v14.0.0 | 在 Readable 流上发出 'close' 事件早于 'end' 事件将导致 ERR_STREAM_PREMATURE_CLOSE 错误。 |
v14.0.0 | 在调用 finished(stream, cb) 之前已完成的流上将调用回调。 |
v10.0.0 | 新增于:v10.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> 可读和/或可写流/web 流。options
<Object>error
<boolean> 如果设置为false
,则对emit('error', err)
的调用不被视为已完成。默认值:true
。readable
<boolean> 当设置为false
时,即使流可能仍然可读,回调也会在流结束时被调用。默认值:true
。writable
<boolean> 当设置为false
时,即使流可能仍然可写,回调也会在流结束时被调用。默认值:true
。signal
<AbortSignal> 允许中止等待流完成。如果信号被中止,底层流 不会 被中止。回调将使用AbortError
被调用。此函数添加的所有注册侦听器也将被删除。
callback
<Function> 一个回调函数,它接受一个可选的错误参数。返回值: <Function> 一个清理函数,用于移除所有已注册的监听器。
一个用于在流不再可读、可写或遇到错误或过早关闭事件时收到通知的函数。
const { finished } = require('node:stream')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
finished(rs, err => {
if (err) {
console.error('Stream failed.', err)
} else {
console.log('Stream is done reading.')
}
})
rs.resume() // 冲洗流。
尤其适用于流过早销毁(例如中止的 HTTP 请求)并且不会发出 'end'
或 'finish'
的错误处理场景。
finished
API 提供了 Promise 版本。
stream.finished()
在调用 callback
后会留下悬空的事件监听器(特别是 'error'
、'end'
、'finish'
和 'close'
)。这样做的原因是为了防止意外的 'error'
事件(由于不正确的流实现)导致意外崩溃。如果这是不需要的行为,则需要在回调中调用返回的清理函数:
const cleanup = finished(rs, err => {
cleanup()
// ...
})
stream.pipeline(source[, ...transforms], destination, callback)
stream.pipeline(streams, callback)
[历史]
版本 | 变更 |
---|---|
v19.7.0, v18.16.0 | 新增对 Web Streams 的支持。 |
v18.0.0 | 将无效回调传递给 callback 参数现在会抛出 ERR_INVALID_ARG_TYPE 而不是 ERR_INVALID_CALLBACK 。 |
v14.0.0 | pipeline(..., cb) 将在调用回调之前等待 'close' 事件。实现尝试检测旧版流,并且只对预期会发出 'close' 的流应用此行为。 |
v13.10.0 | 新增对异步生成器的支持。 |
v10.0.0 | 首次引入: v10.0.0 |
streams
<流[]> | <迭代器[]> | <异步迭代器[]> | <函数[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]>source
<流> | <迭代器> | <异步迭代器> | <函数> | <ReadableStream>...transforms
<流> | <函数> | <TransformStream>destination
<流> | <函数> | <WritableStream>callback
<函数> 管道完全完成时调用。err
<错误>val
destination
返回的Promise
的已解析值。
返回值: <流>
一个模块方法,用于在流和生成器之间进行管道传输,转发错误并正确清理,并在管道完成时提供回调。
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// 使用 pipeline API 可以轻松地将一系列流
// 连接在一起,并在管道完全完成时收到通知。
// 一个高效地 gzip 一个可能很大的 tar 文件的管道:
pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
if (err) {
console.error('管道失败。', err)
} else {
console.log('管道成功。')
}
})
pipeline
API 提供了一个 Promise 版本。
stream.pipeline()
将调用 stream.destroy(err)
关闭所有流,除了:
- 已发出
'end'
或'close'
事件的可读流。 - 已发出
'finish'
或'close'
事件的可写流。
stream.pipeline()
在回调调用后会在流上留下悬空事件监听器。如果在失败后重用流,这会导致事件监听器泄漏和错误被吞没。如果最后一个流是可读的,则会移除悬空事件监听器,以便稍后可以再次使用最后一个流。
stream.pipeline()
在发生错误时关闭所有流。IncomingRequest
与 pipeline
一起使用可能会导致意外行为,因为它会在没有发送预期响应的情况下销毁套接字。请参见下面的示例:
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt')
pipeline(fileStream, res, err => {
if (err) {
console.log(err) // 没有此文件
// 一旦 `pipeline` 已销毁套接字,此消息就无法发送
return res.end('error!!!')
}
})
})
stream.compose(...streams)
[历史]
版本 | 变更 |
---|---|
v21.1.0, v20.10.0 | 添加了对流类的支持。 |
v19.8.0, v18.16.0 | 添加了对 Web Streams 的支持。 |
v16.9.0 | 在 v16.9.0 中添加 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]> | <Duplex[]> | <Function>- 返回值: <stream.Duplex>
将两个或多个流组合成一个 Duplex
流,该流写入第一个流并从最后一个流读取。每个提供的流都使用 stream.pipeline
连接到下一个流。如果任何流发生错误,则所有流都将被销毁,包括外部 Duplex
流。
因为 stream.compose
返回一个新的流,该流又可以(并且应该)连接到其他流,所以它实现了组合。相比之下,当将流传递给 stream.pipeline
时,第一个流通常是可读流,最后一个流是可写流,形成一个闭环。
如果传入一个 Function
,它必须是一个工厂方法,接受一个 source
Iterable
。
import { compose, Transform } from 'node:stream'
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''))
},
})
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
}
let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf
}
console.log(res) // 输出 'HELLOWORLD'
stream.compose
可用于将异步迭代器、生成器和函数转换为流。
AsyncIterable
转换为可读的Duplex
。不能产生null
。AsyncGeneratorFunction
转换为可读/可写的转换Duplex
。必须以源AsyncIterable
作为第一个参数。不能产生null
。AsyncFunction
转换为可写的Duplex
。必须返回null
或undefined
。
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'
// 将 AsyncIterable 转换为可读的 Duplex。
const s1 = compose(
(async function* () {
yield 'Hello'
yield 'World'
})()
)
// 将 AsyncGenerator 转换为转换 Duplex。
const s2 = compose(async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
})
let res = ''
// 将 AsyncFunction 转换为可写的 Duplex。
const s3 = compose(async function (source) {
for await (const chunk of source) {
res += chunk
}
})
await finished(compose(s1, s2, s3))
console.log(res) // 输出 'HELLOWORLD'
参见 readable.compose(stream)
以了解 stream.compose
作为运算符的使用方法。
stream.Readable.from(iterable[, options])
新增于: v12.3.0, v10.17.0
iterable
<Iterable> 实现Symbol.asyncIterator
或Symbol.iterator
可迭代协议的对象。如果传入空值,则会发出 'error' 事件。options
<Object> 提供给new stream.Readable([options])
的选项。默认情况下,Readable.from()
会将options.objectMode
设置为true
,除非通过将options.objectMode
设置为false
明确选择退出。- 返回值: <stream.Readable>
一个用于根据迭代器创建可读流的实用方法。
const { Readable } = require('node:stream')
async function* generate() {
yield 'hello'
yield 'streams'
}
const readable = Readable.from(generate())
readable.on('data', chunk => {
console.log(chunk)
})
出于性能原因,调用 Readable.from(string)
或 Readable.from(buffer)
不会迭代字符串或缓冲区以匹配其他流的语义。
如果传入包含 Promise 的 Iterable
对象作为参数,则可能会导致未处理的拒绝。
const { Readable } = require('node:stream')
Readable.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // 未处理的拒绝
])
stream.Readable.fromWeb(readableStream[, options])
新增于:v17.0.0
readableStream
<ReadableStream>options
<Object>encoding
<string>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
返回值: <stream.Readable>
stream.Readable.isDisturbed(stream)
新增于:v16.8.0
stream
<stream.Readable> | <ReadableStream>- 返回值:
boolean
返回流是否已被读取或取消。
stream.isErrored(stream)
新增于:v17.3.0, v16.14.0
stream
<Readable> | <Writable> | <Duplex> | <WritableStream> | <ReadableStream>- 返回值:<boolean>
返回流是否遇到错误。
stream.isReadable(stream)
新增于:v17.4.0, v16.14.0
stream
<Readable> | <Duplex> | <ReadableStream>- 返回值:<boolean>
返回流是否可读。
stream.Readable.toWeb(streamReadable[, options])
新增于: v17.0.0
streamReadable
<stream.Readable>options
<Object>strategy
<Object>highWaterMark
<number> 在对给定的stream.Readable
应用背压读取之前,创建的ReadableStream
的最大内部队列大小。如果未提供值,则将从给定的stream.Readable
获取。size
<Function> 用于计算给定数据块大小的函数。如果未提供值,则所有块的大小都将为1
。chunk
<any>- 返回值: <number>
返回值: <ReadableStream>
stream.Writable.fromWeb(writableStream[, options])
新增于: v17.0.0
writableStream
<WritableStream>options
<Object>decodeStrings
<boolean>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
返回值: <stream.Writable>
stream.Writable.toWeb(streamWritable)
新增于: v17.0.0
streamWritable
<stream.Writable>- 返回值: <WritableStream>
stream.Duplex.from(src)
[历史]
版本 | 变更 |
---|---|
v19.5.0, v18.17.0 | src 参数现在可以是 ReadableStream 或 WritableStream 。 |
v16.8.0 | 新增于:v16.8.0 |
src
<流> | <Blob> | <ArrayBuffer> | <字符串> | <可迭代对象> | <异步可迭代对象> | <异步生成器函数> | <异步函数> | <Promise> | <对象> | <ReadableStream> | <WritableStream>
用于创建双向流的实用方法。
Stream
将可写流转换为可写的Duplex
,并将可读流转换为Duplex
。Blob
转换为可读Duplex
。string
转换为可读Duplex
。ArrayBuffer
转换为可读Duplex
。AsyncIterable
转换为可读Duplex
。不能产生null
。AsyncGeneratorFunction
转换为可读/可写的转换Duplex
。必须以源AsyncIterable
作为第一个参数。不能产生null
。AsyncFunction
转换为可写的Duplex
。必须返回null
或undefined
Object ({ writable, readable })
将readable
和writable
转换为Stream
,然后将它们组合成Duplex
,其中Duplex
将写入writable
并从readable
读取。Promise
转换为可读Duplex
。值null
将被忽略。ReadableStream
转换为可读Duplex
。WritableStream
转换为可写的Duplex
。- 返回值: <stream.Duplex>
如果传入包含 Promise 的 Iterable
对象作为参数,则可能会导致未处理的拒绝。
const { Duplex } = require('node:stream')
Duplex.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // 未处理的拒绝
])
stream.Duplex.fromWeb(pair[, options])
新增于: v17.0.0
pair
<Object>readable
<ReadableStream>writable
<WritableStream>
options
<Object>返回值: <stream.Duplex>
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
for await (const chunk of duplex) {
console.log('readable', chunk)
}
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))
stream.Duplex.toWeb(streamDuplex)
新增于: v17.0.0
[稳定性: 1 - 实验性]
稳定性: 1 - 实验性
streamDuplex
<stream.Duplex>- 返回值: <Object>
readable
<ReadableStream>writable
<WritableStream>
import { Duplex } from 'node:stream'
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
const { value } = await readable.getReader().read()
console.log('readable', value)
const { Duplex } = require('node:stream')
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
readable
.getReader()
.read()
.then(result => {
console.log('readable', result.value)
})
stream.addAbortSignal(signal, stream)
[历史]
版本 | 变更 |
---|---|
v19.7.0, v18.16.0 | 添加了对 ReadableStream 和 WritableStream 的支持。 |
v15.4.0 | 首次引入: v15.4.0 |
signal
<AbortSignal> 表示可能取消的信号stream
<Stream> | <ReadableStream> | <WritableStream> 要附加信号的流。
将 AbortSignal
附加到可读或可写流。这允许代码使用 AbortController
来控制流的销毁。
调用传递的 AbortSignal
对应的 AbortController
上的 abort
方法,其行为与在流上调用 .destroy(new AbortError())
相同,对于 Web 流则为 controller.error(new AbortError())
。
const fs = require('node:fs')
const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// 稍后,中止操作并关闭流
controller.abort()
或者使用带有可读流作为异步迭代器的 AbortSignal
:
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // 设置超时
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
try {
for await (const chunk of stream) {
await process(chunk)
}
} catch (e) {
if (e.name === 'AbortError') {
// 操作被取消
} else {
throw e
}
}
})()
或者使用带有 ReadableStream
的 AbortSignal
:
const controller = new AbortController()
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello')
controller.enqueue('world')
controller.close()
},
})
addAbortSignal(controller.signal, rs)
finished(rs, err => {
if (err) {
if (err.name === 'AbortError') {
// 操作被取消
}
}
})
const reader = rs.getReader()
reader.read().then(({ value, done }) => {
console.log(value) // hello
console.log(done) // false
controller.abort()
})
stream.getDefaultHighWaterMark(objectMode)
新增于: v19.9.0, v18.17.0
返回流使用的默认 highWaterMark。默认为 65536
(64 KiB),objectMode
为 true
时为 16
。
stream.setDefaultHighWaterMark(objectMode, value)
新增于: v19.9.0, v18.17.0
设置流使用的默认 highWaterMark。
流实现者的 API
node:stream
模块的 API 旨在使使用 JavaScript 的原型继承模型轻松实现流成为可能。
首先,流开发者将声明一个扩展四个基本流类之一 (stream.Writable
、stream.Readable
、stream.Duplex
或 stream.Transform
) 的新 JavaScript 类,确保调用相应的父类构造函数:
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor({ highWaterMark, ...options }) {
super({ highWaterMark })
// ...
}
}
扩展流时,请记住用户可以在将这些选项转发给基构造函数之前提供哪些选项以及应该提供哪些选项。例如,如果实现对 autoDestroy
和 emitClose
选项做出了假设,则不允许用户覆盖这些选项。明确说明转发了哪些选项,而不是隐式地转发所有选项。
然后,新的流类必须实现一个或多个特定方法,具体取决于正在创建的流的类型,如下表所述:
用例 | 类 | 要实现的方法 |
---|---|---|
只读 | Readable | _read() |
只写 | Writable | _write() , _writev() , _final() |
读写 | Duplex | _read() , _write() , _writev() , _final() |
对写入的数据进行操作,然后读取结果 | Transform | _transform() , _flush() , _final() |
流的实现代码绝不应调用专供使用者使用的流的“公共”方法(如流使用者的 API 部分所述)。这样做可能会导致使用该流的应用程序代码出现不良副作用。
避免覆盖公共方法,例如 write()
、end()
、cork()
、uncork()
、read()
和 destroy()
,或通过 .emit()
发出内部事件,例如 'error'
、'data'
、'end'
、'finish'
和 'close'
。这样做可能会破坏当前和未来的流不变性,从而导致与其他流、流实用程序和用户期望的行为和/或兼容性问题。
简化构造
新增于:v1.2.0
对于许多简单的情况,可以无需继承即可创建一个流。这可以通过直接创建 stream.Writable
、stream.Readable
、stream.Duplex
或 stream.Transform
对象的实例并传入适当的方法作为构造函数选项来实现。
const { Writable } = require('node:stream')
const myWritable = new Writable({
construct(callback) {
// 初始化状态并加载资源...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// 释放资源...
},
})
实现可写流
stream.Writable
类被扩展以实现一个 Writable
流。
自定义 Writable
流必须调用 new stream.Writable([options])
构造函数并实现 writable._write()
和/或 writable._writev()
方法。
new stream.Writable([options])
[历史]
版本 | 变更 |
---|---|
v22.0.0 | 提高默认 highWaterMark。 |
v15.5.0 | 支持传入 AbortSignal。 |
v14.0.0 | 将 autoDestroy 选项的默认值更改为 true 。 |
v11.2.0, v10.16.0 | 添加 autoDestroy 选项,以便在流发出 'finish' 或错误时自动 destroy() 流。 |
v10.0.0 | 添加 emitClose 选项以指定是否在销毁时发出 'close' 。 |
options
<对象>highWaterMark
<数字> 当stream.write()
开始返回false
时的缓冲区级别。默认值:65536
(64 KiB),或objectMode
流的16
。decodeStrings
<布尔值> 是否将传递给stream.write()
的string
编码为Buffer
(使用stream.write()
调用中指定的编码)然后再将它们传递给stream._write()
。其他类型的数据不会转换(即Buffer
不会解码为string
)。设置为 false 将阻止string
的转换。默认值:true
。defaultEncoding
<字符串> 当没有编码作为参数指定给stream.write()
时使用的默认编码。默认值:'utf8'
。objectMode
<布尔值>stream.write(anyObj)
是否为有效操作。启用此选项后,如果流实现支持,则可以写入除字符串、<Buffer>、<TypedArray> 或 <DataView> 之外的 JavaScript 值。默认值:false
。emitClose
<布尔值> 流是否应该在其被销毁后发出'close'
。默认值:true
。write
<函数>stream._write()
方法的实现。writev
<函数>stream._writev()
方法的实现。destroy
<函数>stream._destroy()
方法的实现。final
<函数>stream._final()
方法的实现。construct
<函数>stream._construct()
方法的实现。autoDestroy
<布尔值> 此流是否应在其结束之后自动调用.destroy()
本身。默认值:true
。signal
<AbortSignal> 表示可能取消的信号。
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor(options) {
// 调用 stream.Writable() 构造函数。
super(options)
// ...
}
}
或者,当使用 pre-ES6 样式构造函数时:
const { Writable } = require('node:stream')
const util = require('node:util')
function MyWritable(options) {
if (!(this instanceof MyWritable)) return new MyWritable(options)
Writable.call(this, options)
}
util.inherits(MyWritable, Writable)
或者,使用简化的构造函数方法:
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
})
在对应于传递的 AbortSignal
的 AbortController
上调用 abort
将与在可写流上调用 .destroy(new AbortError())
的行为相同。
const { Writable } = require('node:stream')
const controller = new AbortController()
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal,
})
// 稍后,中止操作并关闭流
controller.abort()
writable._construct(callback)
新增于:v15.0.0
callback
<Function> 当流初始化完成时调用此函数(可选地带有一个错误参数)。
_construct()
方法绝不能直接调用。子类可以实现它,如果这样做了,则只会由内部的 Writable
类方法调用。
此可选函数将在流构造函数返回后的一个 tick 中被调用,将任何 _write()
、_final()
和 _destroy()
调用延迟到调用 callback
之后。这对于在可以使用流之前初始化状态或异步初始化资源非常有用。
const { Writable } = require('node:stream')
const fs = require('node:fs')
class WriteStream extends Writable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, 'w', (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback)
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
writable._write(chunk, encoding, callback)
[历史]
版本 | 变更 |
---|---|
v12.11.0 | 提供 _writev() 时,_write() 可选。 |
chunk
<Buffer> | <string> | <any> 将要写入的Buffer
,它由传递给stream.write()
的string
转换而来。如果流的decodeStrings
选项为false
或流处于对象模式,则不会转换chunk
,它将保持传递给stream.write()
的原始值。encoding
<string> 如果chunk
是字符串,则encoding
是该字符串的字符编码。如果chunk
是Buffer
,或者流处于对象模式,则可以忽略encoding
。callback
<Function> 当提供的chunk
处理完成时调用此函数(可以选择使用错误参数)。
所有 Writable
流实现都必须提供 writable._write()
和/或 writable._writev()
方法来将数据发送到底层资源。
Transform
流提供了 writable._write()
的自身实现。
此函数不能由应用程序代码直接调用。它应该由子类实现,并且只能由内部 Writable
类方法调用。
callback
函数必须在 writable._write()
内部同步调用或异步调用(即不同的 tick),以信号写入成功完成或因错误而失败。如果调用失败,则传递给 callback
的第一个参数必须是 Error
对象;如果写入成功,则为 null
。
在调用 writable._write()
和调用 callback
之间发生的所有 writable.write()
调用都会导致写入的数据被缓冲。当调用 callback
时,流可能会发出 'drain'
事件。如果流实现能够一次处理多个数据块,则应实现 writable._writev()
方法。
如果在构造函数选项中将 decodeStrings
属性显式设置为 false
,则 chunk
将保持与传递给 .write()
的相同对象,并且可能是字符串而不是 Buffer
。这是为了支持对某些字符串数据编码进行优化处理的实现。在这种情况下,encoding
参数将指示字符串的字符编码。否则,可以安全地忽略 encoding
参数。
writable._write()
方法前缀带有一个下划线,因为它对定义它的类是内部的,并且不应该由用户程序直接调用。
writable._writev(chunks, callback)
chunks
<Object[]> 待写入的数据。其值为一个 <Object> 数组,数组中的每个对象代表一个需要写入的离散数据块。这些对象的属性包括:callback
<Function> 一个回调函数(可选地包含一个错误参数),在为提供的块完成处理后调用。
此函数绝不能由应用程序代码直接调用。它应该由子类实现,并且仅由内部 Writable
类方法调用。
在能够一次处理多个数据块的流实现中,可以额外实现或替代 writable._write()
实现 writable._writev()
方法。如果已实现,并且存在来自先前写入的缓冲数据,则将调用 _writev()
而不是 _write()
。
writable._writev()
方法以下划线为前缀,因为它对定义它的类是内部的,并且永远不应该被用户程序直接调用。
writable._destroy(err, callback)
新增于: v8.0.0
err
<Error> 一个可能的错误。callback
<Function> 一个回调函数,它可以接收一个可选的错误参数。
_destroy()
方法由 writable.destroy()
调用。子类可以重写它,但绝不能直接调用它。
writable._final(callback)
新增于: v8.0.0
callback
<Function> 完成写入任何剩余数据后调用此函数(可选地带有一个错误参数)。
_final()
方法绝不能直接调用。子类可以实现它,如果实现了,则只会由内部 Writable
类方法调用。
这个可选函数会在流关闭之前被调用,将 'finish'
事件延迟到调用 callback
为止。这对于在流结束之前关闭资源或写入缓冲数据非常有用。
写入错误
在处理 writable._write()
、writable._writev()
和 writable._final()
方法期间发生的错误必须通过调用回调并将错误作为第一个参数传递来传播。从这些方法内部抛出 Error
或手动发出 'error'
事件会导致未定义的行为。
如果 Readable
流在 Writable
发出错误时管道到 Writable
流,则 Readable
流将被取消管道连接。
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
},
})
可写流示例
以下说明了一个相当简单(并且有点无用)的自定义 Writable
流实现。虽然这个特定的 Writable
流实例没有任何实际的用途,但该示例说明了自定义 Writable
流实例所需的每个元素:
const { Writable } = require('node:stream')
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
}
}
可写流中解码缓冲区
解码缓冲区是一项常见任务,例如,当使用输入为字符串的转换器时。当使用多字节字符编码(如 UTF-8)时,这不是一个简单的过程。以下示例显示了如何使用 StringDecoder
和 Writable
解码多字节字符串。
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')
class StringWritable extends Writable {
constructor(options) {
super(options)
this._decoder = new StringDecoder(options?.defaultEncoding)
this.data = ''
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk)
}
this.data += chunk
callback()
}
_final(callback) {
this.data += this._decoder.end()
callback()
}
}
const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()
w.write('currency: ')
w.write(euro[0])
w.end(euro[1])
console.log(w.data) // currency: €
实现可读流
stream.Readable
类被扩展用于实现一个 Readable
流。
自定义 Readable
流必须调用 new stream.Readable([options])
构造函数并实现 readable._read()
方法。
new stream.Readable([options])
[历史]
版本 | 变更 |
---|---|
v22.0.0 | 提升默认 highWaterMark。 |
v15.5.0 | 支持传入 AbortSignal。 |
v14.0.0 | 将 autoDestroy 选项的默认值更改为 true 。 |
v11.2.0, v10.16.0 | 添加 autoDestroy 选项,以便在流发出 'end' 或错误时自动 destroy() 流。 |
options
<对象>highWaterMark
<数字> 内部缓冲区中存储的最大字节数,超过此数后将停止从底层资源读取。默认值:65536
(64 KiB),对于objectMode
流则为16
。encoding
<字符串> 如果指定,则缓冲区将使用指定的编码解码为字符串。默认值:null
。objectMode
<布尔值> 此流是否应表现为对象流。这意味着stream.read(n)
返回单个值而不是大小为n
的Buffer
。默认值:false
。emitClose
<布尔值> 流是否在销毁后发出'close'
事件。默认值:true
。read
<函数>stream._read()
方法的实现。destroy
<函数>stream._destroy()
方法的实现。construct
<函数>stream._construct()
方法的实现。autoDestroy
<布尔值> 此流是否应该在其结束之后自动调用.destroy()
方法。默认值:true
。signal
<AbortSignal> 代表可能取消的信号。
const { Readable } = require('node:stream')
class MyReadable extends Readable {
constructor(options) {
// 调用 stream.Readable(options) 构造函数。
super(options)
// ...
}
}
或者,使用 pre-ES6 样式的构造函数:
const { Readable } = require('node:stream')
const util = require('node:util')
function MyReadable(options) {
if (!(this instanceof MyReadable)) return new MyReadable(options)
Readable.call(this, options)
}
util.inherits(MyReadable, Readable)
或者,使用简化的构造函数方法:
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
// ...
},
})
对传入的 AbortSignal
对应的 AbortController
调用 abort
将与对创建的可读流调用 .destroy(new AbortError())
的行为相同。
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
read(size) {
// ...
},
signal: controller.signal,
})
// 稍后,中止操作并关闭流
controller.abort()
readable._construct(callback)
新增于:v15.0.0
callback
<Function> 当流初始化完成时调用此函数(可选地带有一个错误参数)。
_construct()
方法绝不能直接调用。子类可以实现它,如果实现了,则仅由内部 Readable
类方法调用。
此可选函数将由流构造函数在下个 tick 中调度,将任何 _read()
和 _destroy()
调用延迟到调用 callback
之后。这对于在可以使用流之前初始化状态或异步初始化资源非常有用。
const { Readable } = require('node:stream')
const fs = require('node:fs')
class ReadStream extends Readable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_read(n) {
const buf = Buffer.alloc(n)
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err)
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
}
})
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
readable._read(size)
新增于:v0.9.4
size
<number> 异步读取的字节数
此函数绝不能由应用程序代码直接调用。它应该由子类实现,并且只能由内部的 Readable
类方法调用。
所有 Readable
流实现都必须提供 readable._read()
方法的实现,以从底层资源获取数据。
当调用 readable._read()
时,如果资源中有可用数据,则实现应该开始使用 this.push(dataChunk)
方法将数据推入读取队列。一旦流准备好接受更多数据,在每次调用 this.push(dataChunk)
之后,将再次调用 _read()
。_read()
可以继续从资源读取数据并推送数据,直到 readable.push()
返回 false
。只有当 _read()
在停止后再次被调用时,它才能恢复将额外的数据推入队列。
一旦调用了 readable._read()
方法,在通过 readable.push()
方法推送更多数据之前,将不会再次调用它。空数据(例如空缓冲区和字符串)不会导致调用 readable._read()
。
size
参数是建议性的。对于“读取”是一个返回数据的单个操作的实现,可以使用 size
参数来确定要获取多少数据。其他实现可能会忽略此参数,并在数据可用时提供数据。无需等到有 size
字节可用再调用 stream.push(chunk)
。
readable._read()
方法以下划线为前缀,因为它在定义它的类中是内部的,用户程序绝不应该直接调用它。
readable._destroy(err, callback)
新增于: v8.0.0
err
<Error> 一个可能的错误。callback
<Function> 一个回调函数,它接受一个可选的错误参数。
_destroy()
方法由 readable.destroy()
调用。子类可以重写它,但绝不能直接调用它。
readable.push(chunk[, encoding])
[历史]
版本 | 变更 |
---|---|
v22.0.0, v20.13.0 | chunk 参数现在可以是 TypedArray 或 DataView 实例。 |
v8.0.0 | chunk 参数现在可以是 Uint8Array 实例。 |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> 要推入读取队列的数据块。对于非对象模式的流,chunk
必须是 <string>、<Buffer>、<TypedArray> 或 <DataView>。对于对象模式流,chunk
可以是任何 JavaScript 值。encoding
<string> 字符串块的编码。必须是有效的Buffer
编码,例如'utf8'
或'ascii'
。- 返回值: <boolean>
true
表示可以继续推送更多的数据块;false
表示否则。
当 chunk
是 <Buffer>、<TypedArray>、<DataView> 或 <string> 时,数据块 chunk
将被添加到内部队列中,供流的用户使用。将 chunk
作为 null
传递表示流的结束 (EOF),之后将无法再写入任何数据。
当 Readable
以暂停模式运行时,可以使用 readable.read()
方法在发出 'readable'
事件时读取使用 readable.push()
添加的数据。
当 Readable
以流式模式运行时,使用 readable.push()
添加的数据将通过发出 'data'
事件来传递。
readable.push()
方法的设计尽可能灵活。例如,当包装提供某种暂停/恢复机制和数据回调的较低级别源时,可以使用自定义 Readable
实例包装低级别源:
// `_source` 是一个具有 readStop() 和 readStart() 方法的对象,
// 以及一个在有数据时被调用的 `ondata` 成员,
// 和一个在数据结束时被调用的 `onend` 成员。
class SourceWrapper extends Readable {
constructor(options) {
super(options)
this._source = getLowLevelSourceObject()
// 每当有数据时,将其推入内部缓冲区。
this._source.ondata = chunk => {
// 如果 push() 返回 false,则停止从源读取。
if (!this.push(chunk)) this._source.readStop()
}
// 当源结束时,推送 EOF 信号 `null` 块。
this._source.onend = () => {
this.push(null)
}
}
// _read() 将在流想要拉取更多数据时被调用。
// 此情况下忽略建议的大小参数。
_read(size) {
this._source.readStart()
}
}
readable.push()
方法用于将内容推入内部缓冲区。它可以由 readable._read()
方法驱动。
对于非对象模式的流,如果 readable.push()
的 chunk
参数为 undefined
,则将其视为空字符串或缓冲区。有关更多信息,请参见 readable.push('')
。
读取时的错误
在处理 readable._read()
期间发生的错误必须通过 readable.destroy(err)
方法传播。从 readable._read()
中抛出 Error
或手动发出 'error'
事件会导致未定义的行为。
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
const err = checkSomeErrorCondition()
if (err) {
this.destroy(err)
} else {
// 执行某些操作。
}
},
})
一个计数流示例
以下是 Readable
流的基本示例,它按升序发出从 1 到 1,000,000 的数字,然后结束。
const { Readable } = require('node:stream')
class Counter extends Readable {
constructor(opt) {
super(opt)
this._max = 1000000
this._index = 1
}
_read() {
const i = this._index++
if (i > this._max) this.push(null)
else {
const str = String(i)
const buf = Buffer.from(str, 'ascii')
this.push(buf)
}
}
}
实现双工流
Duplex
流同时实现了 Readable
和 Writable
,例如 TCP 套接字连接。
由于 JavaScript 不支持多重继承,因此扩展 stream.Duplex
类来实现 Duplex
流(而不是扩展 stream.Readable
和 stream.Writable
类)。
stream.Duplex
类原型继承自 stream.Readable
,并寄生继承自 stream.Writable
,但由于在 stream.Writable
上重写了 Symbol.hasInstance
,instanceof
将对这两个基类都正常工作。
自定义 Duplex
流必须调用 new stream.Duplex([options])
构造函数并同时实现 readable._read()
和 writable._write()
方法。
new stream.Duplex(options)
[历史]
版本 | 变更 |
---|---|
v8.4.0 | 现在支持 readableHighWaterMark 和 writableHighWaterMark 选项。 |
options
<对象> 传递给Writable
和Readable
构造函数。还具有以下字段:allowHalfOpen
<布尔值> 如果设置为false
,则当可读端结束时,流将自动结束可写端。默认值:true
。readable
<布尔值> 设置Duplex
是否应可读。默认值:true
。writable
<布尔值> 设置Duplex
是否应可写。默认值:true
。readableObjectMode
<布尔值> 为流的可读端设置objectMode
。如果objectMode
为true
,则无效。默认值:false
。writableObjectMode
<布尔值> 为流的可写端设置objectMode
。如果objectMode
为true
,则无效。默认值:false
。readableHighWaterMark
<数字> 为流的可读端设置highWaterMark
。如果提供了highWaterMark
,则无效。writableHighWaterMark
<数字> 为流的可写端设置highWaterMark
。如果提供了highWaterMark
,则无效。
const { Duplex } = require('node:stream')
class MyDuplex extends Duplex {
constructor(options) {
super(options)
// ...
}
}
或者,使用 pre-ES6 样式的构造函数:
const { Duplex } = require('node:stream')
const util = require('node:util')
function MyDuplex(options) {
if (!(this instanceof MyDuplex)) return new MyDuplex(options)
Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)
或者,使用简化的构造函数方法:
const { Duplex } = require('node:stream')
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
},
})
使用管道时:
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')
pipeline(
fs.createReadStream('object.json').setEncoding('utf8'),
new Transform({
decodeStrings: false, // 接受字符串输入而不是缓冲区
construct(callback) {
this.data = ''
callback()
},
transform(chunk, encoding, callback) {
this.data += chunk
callback()
},
flush(callback) {
try {
// 确保是有效的 JSON。
JSON.parse(this.data)
this.push(this.data)
callback()
} catch (err) {
callback(err)
}
},
}),
fs.createWriteStream('valid-object.json'),
err => {
if (err) {
console.error('failed', err)
} else {
console.log('completed')
}
}
)
一个双工流示例
以下展示了一个简单的Duplex
流示例,它包装了一个假设的低级源对象,可以向该对象写入数据,并从中读取数据,尽管使用的 API 与 Node.js 流不兼容。以下展示了一个简单的Duplex
流示例,它通过Writable
接口缓冲传入的写入数据,并通过Readable
接口读取回数据。
const { Duplex } = require('node:stream')
const kSource = Symbol('source')
class MyDuplex extends Duplex {
constructor(source, options) {
super(options)
this[kSource] = source
}
_write(chunk, encoding, callback) {
// 底层源只处理字符串。
if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
this[kSource].writeSomeData(chunk)
callback()
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding))
})
}
}
Duplex
流最重要的方面是,Readable
和Writable
端尽管共存于单个对象实例中,但它们彼此独立运行。
对象模式双工流
对于 Duplex
流,可以使用 readableObjectMode
和 writableObjectMode
选项分别为 Readable
或 Writable
端独占设置 objectMode
。
在下面的示例中,例如,创建一个新的 Transform
流(它是 Duplex
流的一种类型),它具有一个对象模式 Writable
端,该端接受转换为十六进制字符串的 JavaScript 数字在 Readable
端。
const { Transform } = require('node:stream')
// 所有 Transform 流也是 Duplex 流。
const myTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
// 根据需要将块强制转换为数字。
chunk |= 0
// 将块转换为其他内容。
const data = chunk.toString(16)
// 将数据推送到可读队列。
callback(null, '0'.repeat(data.length % 2) + data)
},
})
myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))
myTransform.write(1)
// 打印:01
myTransform.write(10)
// 打印:0a
myTransform.write(100)
// 打印:64
实现转换流
Transform
流是一个 Duplex
流,其中输出通过某种方式从输入计算得出。例如,zlib 流或 crypto 流可以压缩、加密或解密数据。
输出不需要与输入大小相同、块数相同或到达时间相同。例如,Hash
流只会有一个输出块,该块在输入结束时提供。zlib
流产生的输出可能比其输入小得多或大得多。
stream.Transform
类被扩展以实现 Transform
流。
stream.Transform
类原型继承自 stream.Duplex
并实现其自身的 writable._write()
和 readable._read()
方法版本。自定义 Transform
实现必须实现 transform._transform()
方法,并且也可以实现 transform._flush()
方法。
使用 Transform
流时必须小心,因为写入流的数据可能会导致流的可写端暂停,如果可读端的输出未被使用。
new stream.Transform([options])
options
<对象> 传递给Writable
和Readable
构造函数。也包含以下字段:transform
<函数>stream._transform()
方法的实现。flush
<函数>stream._flush()
方法的实现。
const { Transform } = require('node:stream')
class MyTransform extends Transform {
constructor(options) {
super(options)
// ...
}
}
或者,使用 ES6 之前的构造函数风格:
const { Transform } = require('node:stream')
const util = require('node:util')
function MyTransform(options) {
if (!(this instanceof MyTransform)) return new MyTransform(options)
Transform.call(this, options)
}
util.inherits(MyTransform, Transform)
或者,使用简化的构造函数方法:
const { Transform } = require('node:stream')
const myTransform = new Transform({
transform(chunk, encoding, callback) {
// ...
},
})
事件: 'end'
'end'
事件来自 stream.Readable
类。在所有数据输出后发出 'end'
事件,这发生在 transform._flush()
中的回调函数被调用之后。如果发生错误,则不应发出 'end'
事件。
事件: 'finish'
'finish'
事件来自 stream.Writable
类。在调用 stream.end()
并且所有块都已由 stream._transform()
处理之后,将发出 'finish'
事件。如果发生错误,则不应发出 'finish'
事件。
transform._flush(callback)
callback
<Function> 一个回调函数(可选地带有一个错误参数和数据),用于在剩余数据被刷新后调用。
此函数绝不能由应用程序代码直接调用。它应该由子类实现,并且仅由内部 Readable
类方法调用。
在某些情况下,转换操作可能需要在流的末尾发出额外的少量数据。例如,zlib
压缩流将存储用于最佳压缩输出的内部状态量。但是,当流结束时,需要刷新这些额外的数据,以便压缩数据完整。
自定义 Transform
实现可以实现 transform._flush()
方法。当没有更多写入的数据要使用时,但在发出 'end'
事件以指示 Readable
流结束之前,将调用此方法。
在 transform._flush()
实现中,可以根据需要调用零次或多次 transform.push()
方法。当刷新操作完成后,必须调用 callback
函数。
transform._flush()
方法以下划线为前缀,因为它是定义它的类的内部方法,用户程序绝不应该直接调用它。
transform._transform(chunk, encoding, callback)
chunk
<Buffer> | <string> | <any> 将要转换的Buffer
,由传递给stream.write()
的string
转换而来。如果流的decodeStrings
选项为false
或流处于对象模式,则不会转换chunk
,它将保持传递给stream.write()
的任何值。encoding
<string> 如果chunk
是字符串,则这是编码类型。如果chunk
是缓冲区,则这是特殊值'buffer'
。在这种情况下忽略它。callback
<Function> 一个回调函数(可选地带有一个错误参数和数据),在提供的chunk
被处理后调用。
此函数绝不能由应用程序代码直接调用。它应该由子类实现,并且只能由内部Readable
类方法调用。
所有Transform
流实现必须提供一个_transform()
方法来接受输入并产生输出。transform._transform()
实现处理正在写入的字节,计算输出,然后使用transform.push()
方法将该输出传递给可读部分。
transform.push()
方法可以调用零次或多次以从单个输入块生成输出,这取决于由于块而要输出多少内容。
有可能从任何给定的输入数据块都不会生成输出。
只有当当前块完全被消耗时,才能调用callback
函数。如果在处理输入时发生错误,则传递给callback
的第一个参数必须是Error
对象,否则为null
。如果传递给callback
的第二个参数,它将被转发到transform.push()
方法,但前提是第一个参数为假值。换句话说,以下是等效的:
transform.prototype._transform = function (data, encoding, callback) {
this.push(data)
callback()
}
transform.prototype._transform = function (data, encoding, callback) {
callback(null, data)
}
transform._transform()
方法以下划线为前缀,因为它对定义它的类是内部的,并且用户程序绝不应该直接调用它。
transform._transform()
从不会并行调用;流实现了一种队列机制,要接收下一个块,必须调用callback
,同步或异步均可。
类: stream.PassThrough
stream.PassThrough
类是对 Transform
流的一种简单的实现,它只是将输入字节传递到输出。其主要用途是用于示例和测试,但在某些情况下,stream.PassThrough
作为新型流的构建块也很有用。
附加说明
流与异步生成器和异步迭代器的兼容性
随着 JavaScript 中对异步生成器和迭代器的支持,异步生成器此时实际上是一种一流的语言级流构造。
下面提供了一些使用 Node.js 流与异步生成器和异步迭代器的常见互操作案例。
使用异步迭代器使用可读流
;(async function () {
for await (const chunk of readable) {
console.log(chunk)
}
})()
异步迭代器在流上注册一个永久错误处理程序,以防止任何未处理的销毁后错误。
使用异步生成器创建可读流
可以使用 Readable.from()
工具方法从异步生成器创建 Node.js 可读流:
const { Readable } = require('node:stream')
const ac = new AbortController()
const signal = ac.signal
async function* generate() {
yield 'a'
await someLongRunningFn({ signal })
yield 'b'
yield 'c'
}
const readable = Readable.from(generate())
readable.on('close', () => {
ac.abort()
})
readable.on('data', chunk => {
console.log(chunk)
})
从异步迭代器管道到可写流
从异步迭代器写入可写流时,请确保正确处理背压和错误。stream.pipeline()
隐藏了背压和背压相关错误的处理:
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')
const writable = fs.createWriteStream('./file')
const ac = new AbortController()
const signal = ac.signal
const iterator = createIterator({ signal })
// 回调模式
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err)
} else {
console.log(value, 'value returned')
}
}).on('close', () => {
ac.abort()
})
// Promise 模式
pipelinePromise(iterator, writable)
.then(value => {
console.log(value, 'value returned')
})
.catch(err => {
console.error(err)
ac.abort()
})
与旧版 Node.js 的兼容性
在 Node.js 0.10 之前,Readable
流接口比较简单,但也功能较弱,实用性较差。
- 不会等待对
stream.read()
方法的调用,'data'
事件会立即开始发出。需要执行一定量的工作来决定如何处理数据的应用程序需要将读取的数据存储到缓冲区中,以免数据丢失。 stream.pause()
方法是建议性的,而不是保证性的。这意味着即使流处于暂停状态,仍然需要准备好接收'data'
事件。
在 Node.js 0.10 中,添加了 Readable
类。为了与旧版 Node.js 程序向后兼容,当添加 'data'
事件处理程序或调用 stream.resume()
方法时,Readable
流会切换到“流动模式”。其效果是,即使不使用新的 stream.read()
方法和 'readable'
事件,也不必担心丢失 'data'
块。
虽然大多数应用程序将继续正常运行,但这会在以下情况下引入一个边缘情况:
- 没有添加
'data'
事件监听器。 - 从未调用
stream.resume()
方法。 - 流未连接到任何可写的目标。
例如,考虑以下代码:
// WARNING! BROKEN!
net
.createServer(socket => {
// 我们添加了一个 'end' 监听器,但从未使用过数据。
socket.on('end', () => {
// 它永远不会到达这里。
socket.end('The message was received but was not processed.\n')
})
})
.listen(1337)
在 Node.js 0.10 之前,传入的消息数据将被简单地丢弃。但是,在 Node.js 0.10 及更高版本中,套接字将永远保持暂停状态。
在这种情况下,解决方法是调用 stream.resume()
方法以开始数据流:
// 解决方法。
net
.createServer(socket => {
socket.on('end', () => {
socket.end('The message was received but was not processed.\n')
})
// 开始数据流,并丢弃它。
socket.resume()
})
.listen(1337)
除了新的 Readable
流切换到流动模式外,还可以使用 readable.wrap()
方法将 0.10 之前的样式流包装在 Readable
类中。
readable.read(0)
在某些情况下,需要触发底层可读流机制的刷新,而无需实际消耗任何数据。在这种情况下,可以调用 readable.read(0)
,它将始终返回 null
。
如果内部读取缓冲区低于 highWaterMark
,并且流当前未读取,则调用 stream.read(0)
将触发低级 stream._read()
调用。
虽然大多数应用程序几乎不需要这样做,但在 Node.js 中存在这种情况,尤其是在 Readable
流类内部。
readable.push('')
不建议使用 readable.push('')
。
向非对象模式的流推送零字节的 <string>,<Buffer>,<TypedArray> 或 <DataView> 会产生一个有趣的副作用。因为它是对 readable.push()
的调用,所以该调用将结束读取过程。但是,由于参数是空字符串,因此不会向可读缓冲区添加任何数据,因此用户无法消费任何内容。
调用 readable.setEncoding()
后 highWaterMark
的差异
使用 readable.setEncoding()
会改变非对象模式下 highWaterMark
的运行方式。
通常,当前缓冲区的大小是用字节来衡量,并与 highWaterMark
进行比较。但是,在调用 setEncoding()
之后,比较函数将开始用字符来衡量缓冲区的大小。
对于 latin1
或 ascii
,这通常不是问题。但是,在处理可能包含多字节字符的字符串时,建议注意这种行为。