Web Streams API
[历史]
版本 | 变更 |
---|---|
v21.0.0 | 不再是实验性功能。 |
v18.0.0 | 使用此 API 将不再发出运行时警告。 |
v16.5.0 | 在 v16.5.0 版本中添加 |
WHATWG Streams 标准 的实现。
概述
WHATWG Streams 标准(或“web streams”)定义了一个处理流数据的 API。它类似于 Node.js 的 Streams API,但出现较晚,并且已成为许多 JavaScript 环境中流式处理数据的“标准”API。
有三种主要类型的对象:
ReadableStream
- 表示流数据源。WritableStream
- 表示流数据的目标。TransformStream
- 表示转换流数据的算法。
ReadableStream
示例
此示例创建一个简单的 ReadableStream
,它每秒推送一次当前的 performance.now()
时间戳,持续进行。异步迭代器用于从流中读取数据。
import { ReadableStream } from 'node:stream/web'
import { setInterval as every } from 'node:timers/promises'
import { performance } from 'node:perf_hooks'
const SECOND = 1000
const stream = new ReadableStream({
async start(controller) {
for await (const _ of every(SECOND)) controller.enqueue(performance.now())
},
})
for await (const value of stream) console.log(value)
const { ReadableStream } = require('node:stream/web')
const { setInterval: every } = require('node:timers/promises')
const { performance } = require('node:perf_hooks')
const SECOND = 1000
const stream = new ReadableStream({
async start(controller) {
for await (const _ of every(SECOND)) controller.enqueue(performance.now())
},
})
;(async () => {
for await (const value of stream) console.log(value)
})()
API
类: ReadableStream
[历史]
版本 | 变更 |
---|---|
v18.0.0 | 此类现在已暴露在全局对象上。 |
v16.5.0 | 新增于: v16.5.0 |
new ReadableStream([underlyingSource [, strategy]])
新增于: v16.5.0
underlyingSource
<对象>start
<函数>ReadableStream
创建时立即调用的用户定义函数。controller
<ReadableStreamDefaultController> | <ReadableByteStreamController>返回值:
undefined
或一个以undefined
为结果的 Promise。pull
<函数> 当ReadableStream
内部队列未满时重复调用的用户定义函数。操作可以是同步的或异步的。如果是异步的,则在先前返回的 Promise 完成之前不会再次调用此函数。controller
<ReadableStreamDefaultController> | <ReadableByteStreamController>返回值: 一个以
undefined
为结果的 Promise。cancel
<函数>ReadableStream
被取消时调用的用户定义函数。reason
<任意>返回值: 一个以
undefined
为结果的 Promise。type
<字符串> 必须为'bytes'
或undefined
。autoAllocateChunkSize
<数字> 仅当type
等于'bytes'
时使用。当设置为非零值时,会自动为ReadableByteStreamController.byobRequest
分配一个视图缓冲区。如果没有设置,则必须使用流的内部队列通过默认读取器ReadableStreamDefaultReader
传输数据。
strategy
<对象>
readableStream.locked
新增于:v16.5.0
- 类型:<boolean> 如果此 <ReadableStream> 有一个活动的读取器,则设置为
true
。
readableStream.locked
属性默认值为 false
,当有活动的读取器正在使用流数据时,它会切换为 true
。
readableStream.cancel([reason])
新增于:v16.5.0
reason
<any>- 返回值:取消完成后,返回一个值为
undefined
的已完成 Promise。
readableStream.getReader([options])
新增于:v16.5.0
options
<Object>mode
<string>'byob'
或undefined
返回值:<ReadableStreamDefaultReader> | <ReadableStreamBYOBReader>
import { ReadableStream } from 'node:stream/web'
const stream = new ReadableStream()
const reader = stream.getReader()
console.log(await reader.read())
const { ReadableStream } = require('node:stream/web')
const stream = new ReadableStream()
const reader = stream.getReader()
reader.read().then(console.log)
导致 readableStream.locked
变为 true
。
readableStream.pipeThrough(transform[, options])
新增于: v16.5.0
transform
<对象>readable
<ReadableStream>transform.writable
将把从该ReadableStream
收到的可能已修改的数据推送到此ReadableStream
。writable
<WritableStream> 此ReadableStream
的数据将写入此WritableStream
。
options
<对象>preventAbort
<布尔值> 当值为true
时,此ReadableStream
中的错误不会导致transform.writable
被中止。preventCancel
<布尔值> 当值为true
时,目标transform.writable
中的错误不会导致此ReadableStream
被取消。preventClose
<布尔值> 当值为true
时,关闭此ReadableStream
不会导致transform.writable
被关闭。signal
<AbortSignal> 允许使用 <AbortController> 取消数据的传输。
返回值: <ReadableStream> 来自
transform.readable
。
将此 <ReadableStream> 连接到 transform
参数中提供的 <ReadableStream> 和 <WritableStream> 对,以便将来自此 <ReadableStream> 的数据写入 transform.writable
(可能已转换),然后推送到 transform.readable
。配置管道后,将返回 transform.readable
。
在管道操作处于活动状态时,会导致 readableStream.locked
为 true
。
import { ReadableStream, TransformStream } from 'node:stream/web'
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a')
},
})
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase())
},
})
const transformedStream = stream.pipeThrough(transform)
for await (const chunk of transformedStream) console.log(chunk)
// 输出:A
const { ReadableStream, TransformStream } = require('node:stream/web')
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a')
},
})
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase())
},
})
const transformedStream = stream.pipeThrough(transform)
;(async () => {
for await (const chunk of transformedStream) console.log(chunk)
// 输出:A
})()
readableStream.pipeTo(destination[, options])
新增于: v16.5.0
destination
<WritableStream> 一个 <WritableStream> ,此ReadableStream
的数据将写入其中。options
<Object>preventAbort
<boolean> 当true
时,此ReadableStream
中的错误不会导致destination
被中止。preventCancel
<boolean> 当true
时,destination
中的错误不会导致此ReadableStream
被取消。preventClose
<boolean> 当true
时,关闭此ReadableStream
不会导致destination
被关闭。signal
<AbortSignal> 允许使用 <AbortController> 取消数据的传输。
返回值:一个以
undefined
为结果完成的 Promise
在管道操作处于活动状态时,使 readableStream.locked
为 true
。
readableStream.tee()
[历史]
版本 | 变更 |
---|---|
v18.10.0, v16.18.0 | 支持复制可读字节流。 |
v16.5.0 | v16.5.0 版本中添加 |
- 返回值: <ReadableStream[]>
返回一对新的 <ReadableStream> 实例,此 ReadableStream
的数据将被转发到这两个实例。每个实例都将接收相同的数据。
导致 readableStream.locked
为 true
。
readableStream.values([options])
v16.5.0 版本中添加
options
<对象>preventCancel
<布尔值> 当值为true
时,阻止异步迭代器突然终止时关闭 <ReadableStream>。默认值:false
。
创建并返回一个异步迭代器,用于使用此 ReadableStream
的数据。
在异步迭代器活动期间,导致 readableStream.locked
为 true
。
import { Buffer } from 'node:buffer'
const stream = new ReadableStream(getSomeSource())
for await (const chunk of stream.values({ preventCancel: true })) console.log(Buffer.from(chunk).toString())
异步迭代
<ReadableStream>
对象使用 for await
语法支持异步迭代器协议。
import { Buffer } from 'node:buffer'
const stream = new ReadableStream(getSomeSource())
for await (const chunk of stream) console.log(Buffer.from(chunk).toString())
异步迭代器将消耗 <ReadableStream>
直到其终止。
默认情况下,如果异步迭代器提前退出(通过 break
、return
或 throw
),则 <ReadableStream>
将被关闭。为了防止自动关闭 <ReadableStream>
,可以使用 readableStream.values()
方法获取异步迭代器并将 preventCancel
选项设置为 true
。
<ReadableStream>
必须未被锁定(即,它不能拥有现有的活动读取器)。在异步迭代期间,<ReadableStream>
将被锁定。
使用 postMessage()
传输
可以使用 <MessagePort> 传输 <ReadableStream> 实例。
const stream = new ReadableStream(getReadableSourceSomehow())
const { port1, port2 } = new MessageChannel()
port1.onmessage = ({ data }) => {
data
.getReader()
.read()
.then(chunk => {
console.log(chunk)
})
}
port2.postMessage(stream, [stream])
ReadableStream.from(iterable)
新增于:v20.6.0
iterable
<Iterable> 实现Symbol.asyncIterator
或Symbol.iterator
可迭代协议的对象。
一个实用方法,用于从可迭代对象创建一个新的 <ReadableStream>。
import { ReadableStream } from 'node:stream/web'
async function* asyncIterableGenerator() {
yield 'a'
yield 'b'
yield 'c'
}
const stream = ReadableStream.from(asyncIterableGenerator())
for await (const chunk of stream) console.log(chunk) // 输出:'a', 'b', 'c'
const { ReadableStream } = require('node:stream/web')
async function* asyncIterableGenerator() {
yield 'a'
yield 'b'
yield 'c'
}
;(async () => {
const stream = ReadableStream.from(asyncIterableGenerator())
for await (const chunk of stream) console.log(chunk) // 输出:'a', 'b', 'c'
})()
类: ReadableStreamDefaultReader
[历史]
版本 | 变更 |
---|---|
v18.0.0 | 此类现在已在全局对象上公开。 |
v16.5.0 | 新增于: v16.5.0 |
默认情况下,调用 readableStream.getReader()
且不带任何参数将返回 ReadableStreamDefaultReader
的实例。默认读取器将流中传递的数据块视为不透明值,这允许 <ReadableStream> 使用任何 JavaScript 值。
new ReadableStreamDefaultReader(stream)
新增于: v16.5.0
stream
<ReadableStream>
创建一个新的 <ReadableStreamDefaultReader>,它被锁定到给定的 <ReadableStream>。
readableStreamDefaultReader.cancel([reason])
新增于: v16.5.0
reason
<any>- 返回值: 一个以
undefined
为结果完成的 Promise。
取消 <ReadableStream> 并返回一个 Promise,该 Promise 在底层流被取消后完成。
readableStreamDefaultReader.closed
新增于:v16.5.0
- 类型:<Promise> 当关联的 <ReadableStream> 关闭时,结果为
undefined
;如果流出错或在流完成关闭之前释放了读取器的锁,则会被拒绝。
readableStreamDefaultReader.read()
新增于:v16.5.0
请求来自底层 <ReadableStream> 的下一个数据块,并返回一个 Promise,一旦数据可用,该 Promise 就会完成并返回数据。
readableStreamDefaultReader.releaseLock()
新增于:v16.5.0
释放此读取器对底层 <ReadableStream> 的锁。
类: ReadableStreamBYOBReader
[历史]
版本 | 变更 |
---|---|
v18.0.0 | 此类现在已在全局对象上公开。 |
v16.5.0 | 新增于:v16.5.0 |
ReadableStreamBYOBReader
是面向字节的 <ReadableStream>(在创建 ReadableStream
时,将 underlyingSource.type
设置为 'bytes'
的那些流)的替代消费者。
BYOB
是“自带缓冲区”(bring your own buffer)的缩写。这是一种模式,允许更有效地读取面向字节的数据,避免不必要的复制。
import { open } from 'node:fs/promises'
import { ReadableStream } from 'node:stream/web'
import { Buffer } from 'node:buffer'
class Source {
type = 'bytes'
autoAllocateChunkSize = 1024
async start(controller) {
this.file = await open(new URL(import.meta.url))
this.controller = controller
}
async pull(controller) {
const view = controller.byobRequest?.view
const { bytesRead } = await this.file.read({
buffer: view,
offset: view.byteOffset,
length: view.byteLength,
})
if (bytesRead === 0) {
await this.file.close()
this.controller.close()
}
controller.byobRequest.respond(bytesRead)
}
}
const stream = new ReadableStream(new Source())
async function read(stream) {
const reader = stream.getReader({ mode: 'byob' })
const chunks = []
let result
do {
result = await reader.read(Buffer.alloc(100))
if (result.value !== undefined) chunks.push(Buffer.from(result.value))
} while (!result.done)
return Buffer.concat(chunks)
}
const data = await read(stream)
console.log(Buffer.from(data).toString())
new ReadableStreamBYOBReader(stream)
新增于: v16.5.0
stream
<ReadableStream>
创建一个新的 ReadableStreamBYOBReader
,它被锁定到给定的 <ReadableStream>。
readableStreamBYOBReader.cancel([reason])
新增于: v16.5.0
reason
<any>- 返回值: 一个以
undefined
作为结果值完成的 Promise。
取消 <ReadableStream> 并返回一个 Promise,该 Promise 在底层流被取消时完成。
readableStreamBYOBReader.closed
新增于: v16.5.0
- 类型: <Promise> 当关联的 <ReadableStream> 关闭时,以
undefined
作为结果值完成;如果流出错或在流完成关闭之前释放了读取器的锁,则该 Promise 将被拒绝。
readableStreamBYOBReader.read(view[, options])
[历史]
版本 | 变更 |
---|---|
v21.7.0, v20.17.0 | 新增 min 选项。 |
v16.5.0 | 首次引入: v16.5.0 |
view
<Buffer> | <TypedArray> | <DataView>options
<Object>min
<number> 设置后,返回的 Promise 只有在至少有min
个元素可用时才会被 fulfilled。未设置时,Promise 在至少有一个元素可用时即被 fulfilled。
返回值: 一个被 fulfilled 的 Promise,包含以下对象:
value
<TypedArray> | <DataView>done
<boolean>
向底层的 <ReadableStream> 请求下一块数据,并返回一个 Promise,该 Promise 在数据可用时被 fulfilled。
不要将池化的 <Buffer> 对象实例传递给此方法。池化 Buffer
对象是使用 Buffer.allocUnsafe()
或 Buffer.from()
创建的,或者经常由各种 node:fs
模块回调返回。这些类型的 Buffer
使用共享的底层 <ArrayBuffer> 对象,该对象包含所有池化 Buffer
实例的所有数据。当 Buffer
、<TypedArray> 或 <DataView> 传递给 readableStreamBYOBReader.read()
时,视图的底层 ArrayBuffer
将被 分离,使该 ArrayBuffer
上可能存在的现有所有视图无效。这可能会对您的应用程序产生灾难性的后果。
readableStreamBYOBReader.releaseLock()
新增于: v16.5.0
释放此读取器对底层 <ReadableStream> 的锁定。
类: ReadableStreamDefaultController
新增于: v16.5.0
每个 <ReadableStream> 都有一个控制器,负责流队列的内部状态和管理。ReadableStreamDefaultController
是非字节导向的 ReadableStream
的默认控制器实现。
readableStreamDefaultController.close()
新增于: v16.5.0
关闭与此控制器关联的 <ReadableStream>。
readableStreamDefaultController.desiredSize
新增于: v16.5.0
- 类型: <number>
返回 <ReadableStream> 队列中剩余的待填充数据量。
readableStreamDefaultController.enqueue([chunk])
新增于: v16.5.0
chunk
<any>
将新的数据块添加到 <ReadableStream> 的队列中。
readableStreamDefaultController.error([error])
新增于: v16.5.0
error
<any>
发出错误信号,导致 <ReadableStream> 出错并关闭。
类: ReadableByteStreamController
[历史]
版本 | 变更 |
---|---|
v18.10.0 | 支持处理来自已释放读取器的 BYOB 拉取请求。 |
v16.5.0 | 新增于: v16.5.0 |
每个 <ReadableStream> 都有一个控制器,负责流队列的内部状态和管理。ReadableByteStreamController
用于面向字节的 ReadableStream
。
readableByteStreamController.byobRequest
新增于: v16.5.0
readableByteStreamController.close()
新增于: v16.5.0
关闭与此控制器关联的 <ReadableStream>。
readableByteStreamController.desiredSize
新增于: v16.5.0
- 类型: <number>
返回 <ReadableStream> 队列中剩余的待填充数据量。
readableByteStreamController.enqueue(chunk)
新增于: v16.5.0
chunk
: <Buffer> | <TypedArray> | <DataView>
向 <ReadableStream> 队列追加新的数据块。
readableByteStreamController.error([error])
新增于:v16.5.0
error
<any>
发出一个错误信号,导致 <ReadableStream> 出错并关闭。
类:ReadableStreamBYOBRequest
[历史]
版本 | 变更 |
---|---|
v18.0.0 | 此类现在暴露在全局对象上。 |
v16.5.0 | 新增于:v16.5.0 |
在面向字节的流中使用 ReadableByteStreamController
,以及使用 ReadableStreamBYOBReader
时,readableByteStreamController.byobRequest
属性提供对 ReadableStreamBYOBRequest
实例的访问,该实例表示当前的读取请求。该对象用于访问已提供用于填充读取请求的 ArrayBuffer
/TypedArray
,并提供用于发出已提供数据的信号的方法。
readableStreamBYOBRequest.respond(bytesWritten)
新增于:v16.5.0
bytesWritten
<number>
发出已向 readableStreamBYOBRequest.view
写入 bytesWritten
个字节的信号。
readableStreamBYOBRequest.respondWithNewView(view)
新增于: v16.5.0
view
<Buffer> | <TypedArray> | <DataView>
表示请求已使用写入新Buffer
、TypedArray
或DataView
的字节完成。
readableStreamBYOBRequest.view
新增于: v16.5.0
- 类型: <Buffer> | <TypedArray> | <DataView>
类: WritableStream
[历史]
版本 | 变更 |
---|---|
v18.0.0 | 此类现在已暴露在全局对象上。 |
v16.5.0 | 新增于: v16.5.0 |
WritableStream
是发送流数据的目标。
import { WritableStream } from 'node:stream/web'
const stream = new WritableStream({
write(chunk) {
console.log(chunk)
},
})
await stream.getWriter().write('Hello World')
new WritableStream([underlyingSink[, strategy]])
新增于: v16.5.0
underlyingSink
<Object>start
<Function> 当创建WritableStream
时立即调用的用户定义函数。controller
<WritableStreamDefaultController>返回值:
undefined
或一个以undefined
为结果的 Promise。write
<Function> 当数据块写入WritableStream
时调用的用户定义函数。chunk
<any>controller
<WritableStreamDefaultController>返回值: 一个以
undefined
为结果的 Promise。close
<Function> 当WritableStream
关闭时调用的用户定义函数。返回值: 一个以
undefined
为结果的 Promise。abort
<Function> 用于突然关闭WritableStream
的用户定义函数。reason
<any>返回值: 一个以
undefined
为结果的 Promise。type
<any>type
选项保留供将来使用,并且必须为undefined
。
strategy
<Object>highWaterMark
<number> 应用背压之前的最大内部队列大小。size
<Function> 用于识别每个数据块大小的用户定义函数。chunk
<any>- 返回值: <number>
writableStream.abort([reason])
Added in: v16.5.0
reason
<any>- 返回值:一个以
undefined
为结果值完成的 Promise。
突然终止 WritableStream
。所有排队的写入操作都将被取消,其关联的 Promise 将被拒绝。
writableStream.close()
Added in: v16.5.0
- 返回值:一个以
undefined
为结果值完成的 Promise。
当不再期望额外的写入操作时,关闭 WritableStream
。
writableStream.getWriter()
Added in: v16.5.0
创建一个新的写入器实例,并返回该实例,可以使用它将数据写入 WritableStream
。
writableStream.locked
Added in: v16.5.0
- 类型:<boolean>
writableStream.locked
属性默认为 false
,当存在附加到此 WritableStream
的活动写入器时,它将切换为 true
。
使用 postMessage() 传输
可以使用 <MessagePort> 传输 <WritableStream> 实例。
const stream = new WritableStream(getWritableSinkSomehow())
const { port1, port2 } = new MessageChannel()
port1.onmessage = ({ data }) => {
data.getWriter().write('hello')
}
port2.postMessage(stream, [stream])
类: WritableStreamDefaultWriter
[历史]
版本 | 变更 |
---|---|
v18.0.0 | 此类现在已在全局对象上公开。 |
v16.5.0 | 新增于:v16.5.0 |
new WritableStreamDefaultWriter(stream)
新增于:v16.5.0
stream
<WritableStream>
创建一个新的 WritableStreamDefaultWriter
,它被锁定到给定的 WritableStream
。
writableStreamDefaultWriter.abort([reason])
新增于:v16.5.0
reason
<任意>- 返回:一个以
undefined
作为结果完成的 Promise。
突然终止 WritableStream
。所有排队的写入都将被取消,其关联的 Promise 将被拒绝。
writableStreamDefaultWriter.close()
新增于:v16.5.0
- 返回值:一个以
undefined
为结果值完成的 Promise。
当不再需要写入额外数据时,关闭 WritableStream
。
writableStreamDefaultWriter.closed
新增于:v16.5.0
- 类型:一个 <Promise> ,当关联的 <WritableStream> 关闭时,结果值为
undefined
;如果流出错或在流完成关闭之前释放了写入器的锁,则会被拒绝。
writableStreamDefaultWriter.desiredSize
新增于:v16.5.0
- 类型: <number>
填充 <WritableStream> 队列所需的数据量。
writableStreamDefaultWriter.ready
新增于:v16.5.0
- 类型:一个 <Promise> ,当写入器准备好使用时,结果值为
undefined
。
writableStreamDefaultWriter.releaseLock()
新增于: v16.5.0
释放此写入器对底层 <ReadableStream> 的锁定。
writableStreamDefaultWriter.write([chunk])
新增于: v16.5.0
chunk
: <any>- 返回值: 一个以
undefined
为结果完成的 Promise。
将新的数据块添加到 <WritableStream> 的队列中。
类: WritableStreamDefaultController
[历史]
版本 | 变更 |
---|---|
v18.0.0 | 此类现在暴露在全局对象上。 |
v16.5.0 | 新增于: v16.5.0 |
WritableStreamDefaultController
管理 <WritableStream> 的内部状态。
writableStreamDefaultController.error([error])
新增于: v16.5.0
error
<any>
由用户代码调用,以指示在处理 WritableStream
数据时发生错误。调用此方法时, <WritableStream> 将被中止,当前挂起的写入将被取消。
writableStreamDefaultController.signal
- 类型: <AbortSignal> 一个
AbortSignal
,当 <WritableStream> 被中止时,可用于取消挂起的写入或关闭操作。
类:TransformStream
[历史]
版本 | 变更 |
---|---|
v18.0.0 | 此类现在已公开在全局对象上。 |
v16.5.0 | v16.5.0 中添加 |
TransformStream
由一个 <ReadableStream> 和一个 <WritableStream> 组成,它们连接在一起,以便写入 WritableStream
的数据在推入 ReadableStream
的队列之前会被接收,并可能被转换。
import { TransformStream } from 'node:stream/web'
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase())
},
})
await Promise.all([transform.writable.getWriter().write('A'), transform.readable.getReader().read()])
new TransformStream([transformer[, writableStrategy[, readableStrategy]]])
新增于: v16.5.0
transformer
<Object>start
<Function>TransformStream
创建时立即调用的用户自定义函数。controller
<TransformStreamDefaultController>返回值:
undefined
或一个以undefined
为结果的 Promisetransform
<Function> 接收并可能修改写入transformStream.writable
的数据块,然后将其转发到transformStream.readable
的用户自定义函数。chunk
<any>controller
<TransformStreamDefaultController>返回值: 一个以
undefined
为结果的 Promise。flush
<Function> 在TransformStream
的写入端关闭之前立即调用的用户自定义函数,表示转换过程结束。controller
<TransformStreamDefaultController>返回值: 一个以
undefined
为结果的 Promise。readableType
<any>readableType
选项保留供将来使用,并且必须为undefined
。writableType
<any>writableType
选项保留供将来使用,并且必须为undefined
。
writableStrategy
<Object>highWaterMark
<number> 应用背压之前的最大内部队列大小。size
<Function> 用于识别每个数据块大小的用户自定义函数。chunk
<any>- 返回值: <number>
readableStrategy
<Object>highWaterMark
<number> 应用背压之前的最大内部队列大小。size
<Function> 用于识别每个数据块大小的用户自定义函数。chunk
<any>- 返回值: <number>
transformStream.readable
新增于: v16.5.0
- 类型: <ReadableStream>
transformStream.writable
新增于: v16.5.0
- 类型: <WritableStream>
使用 postMessage()
传输
一个 <TransformStream> 实例可以使用 <MessagePort> 传输。
const stream = new TransformStream()
const { port1, port2 } = new MessageChannel()
port1.onmessage = ({ data }) => {
const { writable, readable } = data
// ...
}
port2.postMessage(stream, [stream])
类: TransformStreamDefaultController
[历史]
版本 | 变更 |
---|---|
v18.0.0 | 此类现在暴露在全局对象上。 |
v16.5.0 | 新增于: v16.5.0 |
TransformStreamDefaultController
管理 TransformStream
的内部状态。
transformStreamDefaultController.desiredSize
新增于: v16.5.0
- 类型: <number>
需要填充可读端队列的数据量。
transformStreamDefaultController.enqueue([chunk])
新增于: v16.5.0
chunk
<any>
将一段数据添加到可读端的队列。
transformStreamDefaultController.error([reason])
新增于: v16.5.0
reason
<any>
向可读端和可写端发出信号,指示在处理转换数据时发生错误,导致两端都突然关闭。
transformStreamDefaultController.terminate()
新增于: v16.5.0
关闭传输的可读端,并导致可写端因错误而突然关闭。
类: ByteLengthQueuingStrategy
[历史]
版本 | 变更 |
---|---|
v18.0.0 | 此类现在已公开在全局对象上。 |
v16.5.0 | 新增于: v16.5.0 |
new ByteLengthQueuingStrategy(init)
新增于: v16.5.0
byteLengthQueuingStrategy.highWaterMark
新增于: v16.5.0
- 类型: <number>
byteLengthQueuingStrategy.size
新增于: v16.5.0
- 类型: <Function>
类: CountQueuingStrategy
[历史]
版本 | 变更 |
---|---|
v18.0.0 | 此类现在已暴露在全局对象上。 |
v16.5.0 | 新增于: v16.5.0 |
new CountQueuingStrategy(init)
新增于: v16.5.0
countQueuingStrategy.highWaterMark
新增于: v16.5.0
- 类型: <number>
countQueuingStrategy.size
新增于: v16.5.0
- 类型: <Function>
类: TextEncoderStream
[历史]
版本 | 变更 |
---|---|
v18.0.0 | 此类现在已暴露在全局对象上。 |
v16.6.0 | 新增于: v16.6.0 |
new TextEncoderStream()
新增于: v16.6.0
创建一个新的 TextEncoderStream
实例。
textEncoderStream.encoding
新增于: v16.6.0
- 类型: <string>
TextEncoderStream
实例支持的编码。
textEncoderStream.readable
新增于: v16.6.0
- 类型: <ReadableStream>
textEncoderStream.writable
新增于: v16.6.0
- 类型: <WritableStream>
类: TextDecoderStream
[历史]
版本 | 变更 |
---|---|
v18.0.0 | 此类现在已在全局对象上公开。 |
v16.6.0 | 新增于: v16.6.0 |
new TextDecoderStream([encoding[, options]])
新增于: v16.6.0
创建新的 TextDecoderStream
实例。
textDecoderStream.encoding
新增于:v16.6.0
- 类型: <string>
TextDecoderStream
实例支持的编码。
textDecoderStream.fatal
新增于:v16.6.0
- 类型: <boolean>
如果解码错误导致抛出 TypeError
,则该值为 true
。
textDecoderStream.ignoreBOM
新增于:v16.6.0
- 类型: <boolean>
如果解码结果包含字节顺序标记,则该值为 true
。
textDecoderStream.readable
新增于:v16.6.0
- 类型: <ReadableStream>
textDecoderStream.writable
新增于:v16.6.0
- 类型: <WritableStream>
类:CompressionStream
[历史]
版本 | 变更 |
---|---|
v18.0.0 | 此类现在已暴露在全局对象上。 |
v17.0.0 | 新增于:v17.0.0 |
new CompressionStream(format)
[历史]
版本 | 变更 |
---|---|
v21.2.0, v20.12.0 | format 现在接受 deflate-raw 值。 |
v17.0.0 | v17.0.0 版本中添加 |
format
<字符串>'deflate'
,'deflate-raw'
或'gzip'
之一。
compressionStream.readable
v17.0.0 版本中添加
- 类型: <可读流>
compressionStream.writable
v17.0.0 版本中添加
- 类型: <可写流>
类: DecompressionStream
[历史]
版本 | 变更 |
---|---|
v18.0.0 | 此类现在已在全局对象上公开。 |
v17.0.0 | v17.0.0 版本中添加 |
new DecompressionStream(format)
[历史]
版本 | 变更 |
---|---|
v21.2.0, v20.12.0 | format 现在接受 deflate-raw 值。 |
v17.0.0 | v17.0.0 版本中添加 |
format
<字符串>'deflate'
,'deflate-raw'
或'gzip'
之一。
decompressionStream.readable
新增于:v17.0.0
decompressionStream.writable
新增于:v17.0.0
实用程序消费者
新增于:v16.7.0
实用程序消费者函数提供了用于使用流的常用选项。
它们使用以下方式访问:
import { arrayBuffer, blob, buffer, json, text } from 'node:stream/consumers'
const { arrayBuffer, blob, buffer, json, text } = require('node:stream/consumers')
streamConsumers.arrayBuffer(stream)
新增于:v16.7.0
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回值:<Promise> 返回一个包含流全部内容的
ArrayBuffer
。
import { arrayBuffer } from 'node:stream/consumers'
import { Readable } from 'node:stream'
import { TextEncoder } from 'node:util'
const encoder = new TextEncoder()
const dataArray = encoder.encode('hello world from consumers!')
const readable = Readable.from(dataArray)
const data = await arrayBuffer(readable)
console.log(`from readable: ${data.byteLength}`)
// 打印:from readable: 76
const { arrayBuffer } = require('node:stream/consumers')
const { Readable } = require('node:stream')
const { TextEncoder } = require('node:util')
const encoder = new TextEncoder()
const dataArray = encoder.encode('hello world from consumers!')
const readable = Readable.from(dataArray)
arrayBuffer(readable).then(data => {
console.log(`from readable: ${data.byteLength}`)
// 打印:from readable: 76
})
streamConsumers.blob(stream)
新增于: v16.7.0
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回值: <Promise> 返回包含流全部内容的 <Blob>。
import { blob } from 'node:stream/consumers'
const dataBlob = new Blob(['hello world from consumers!'])
const readable = dataBlob.stream()
const data = await blob(readable)
console.log(`from readable: ${data.size}`)
// 输出:from readable: 27
const { blob } = require('node:stream/consumers')
const dataBlob = new Blob(['hello world from consumers!'])
const readable = dataBlob.stream()
blob(readable).then(data => {
console.log(`from readable: ${data.size}`)
// 输出:from readable: 27
})
streamConsumers.buffer(stream)
新增于: v16.7.0
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回值: <Promise> 返回一个包含流全部内容的 <Buffer>。
import { buffer } from 'node:stream/consumers'
import { Readable } from 'node:stream'
import { Buffer } from 'node:buffer'
const dataBuffer = Buffer.from('hello world from consumers!')
const readable = Readable.from(dataBuffer)
const data = await buffer(readable)
console.log(`from readable: ${data.length}`)
// 输出:from readable: 27
const { buffer } = require('node:stream/consumers')
const { Readable } = require('node:stream')
const { Buffer } = require('node:buffer')
const dataBuffer = Buffer.from('hello world from consumers!')
const readable = Readable.from(dataBuffer)
buffer(readable).then(data => {
console.log(`from readable: ${data.length}`)
// 输出:from readable: 27
})
streamConsumers.json(stream)
新增于: v16.7.0
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回值: <Promise> 解析为 UTF-8 编码字符串,然后通过
JSON.parse()
解析的流内容。
import { json } from 'node:stream/consumers'
import { Readable } from 'node:stream'
const items = Array.from(
{
length: 100,
},
() => ({
message: 'hello world from consumers!',
})
)
const readable = Readable.from(JSON.stringify(items))
const data = await json(readable)
console.log(`from readable: ${data.length}`)
// 输出:from readable: 100
const { json } = require('node:stream/consumers')
const { Readable } = require('node:stream')
const items = Array.from(
{
length: 100,
},
() => ({
message: 'hello world from consumers!',
})
)
const readable = Readable.from(JSON.stringify(items))
json(readable).then(data => {
console.log(`from readable: ${data.length}`)
// 输出:from readable: 100
})
streamConsumers.text(stream)
新增于: v16.7.0
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回值: <Promise> 解析为 UTF-8 编码字符串的流内容。
import { text } from 'node:stream/consumers'
import { Readable } from 'node:stream'
const readable = Readable.from('Hello world from consumers!')
const data = await text(readable)
console.log(`from readable: ${data.length}`)
// 打印:from readable: 27
const { text } = require('node:stream/consumers')
const { Readable } = require('node:stream')
const readable = Readable.from('Hello world from consumers!')
text(readable).then(data => {
console.log(`from readable: ${data.length}`)
// 打印:from readable: 27
})