流处理中的背压
数据处理过程中存在一个普遍问题,称为背压,它描述了数据传输过程中缓冲区后面数据累积的情况。当传输的接收端执行复杂操作或由于任何原因速度较慢时,来自传入源的数据往往会累积,就像堵塞一样。
为了解决这个问题,必须有一个委托系统来确保数据从一个源到另一个源的平滑流动。不同的社区已经根据其程序独特地解决了这个问题,Unix 管道和 TCP 套接字就是很好的例子,通常被称为流量控制。在 Node.js 中,流已被采纳为解决方案。
本指南的目的是进一步详细说明背压是什么,以及流在 Node.js 源代码中是如何解决这个问题的。指南的第二部分将介绍一些建议的最佳实践,以确保你的应用程序代码在实现流时安全且经过优化。
我们假设你对 Node.js 中backpressure
、Buffer
和EventEmitters
的通用定义以及一些Stream
的经验略知一二。如果你还没有阅读这些文档,最好先查看一下API 文档,因为它将有助于你在阅读本指南时扩展你的理解。
数据处理的问题
在计算机系统中,数据通过管道、套接字和信号在不同的进程之间传输。在 Node.js 中,我们发现了类似的机制,称为 Stream
(流)。流很棒!它们为 Node.js 做了很多事情,内部代码库的几乎每个部分都使用了这个模块。作为开发者,也强烈建议你使用它们!
const readline = require('node:readline')
const rl = readline.createInterface({
output: process.stdout,
input: process.stdin,
})
rl.question('为什么要使用流? ', answer => {
console.log(`也许是因为 ${answer},也许是因为它们很棒!`)
})
rl.close()
通过流实现的反压机制是一个很好的优化,一个很好的例子可以通过比较 Node.js 流实现的内部系统工具来演示。
在一个场景中,我们将获取一个大型文件(大约 -9 GB)并使用熟悉的 zip(1)
工具对其进行压缩。
zip The.Matrix.1080p.mkv
虽然这需要几分钟才能完成,但在另一个 shell 中,我们可以运行一个脚本,该脚本使用 Node.js 的模块 zlib
,它围绕另一个压缩工具 gzip(1)
进行包装。
const gzip = require('node:zlib').createGzip()
const fs = require('node:fs')
const inp = fs.createReadStream('The.Matrix.1080p.mkv')
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz')
inp.pipe(gzip).pipe(out)
要测试结果,请尝试打开每个压缩文件。zip(1)
工具压缩的文件会通知你文件已损坏,而 Stream 完成的压缩将无错误地解压缩。
注意
在这个例子中,我们使用 .pipe()
将数据源从一端传递到另一端。但是,请注意没有附加适当的错误处理程序。如果一部分数据未能正确接收,则 Readable 源或 gzip
流将不会被销毁。pump
是一个实用工具,如果管道中的一个流失败或关闭,它将正确地销毁管道中的所有流,在这种情况下是必须的!
pump
仅适用于 Node.js 8.x 或更早版本,对于 Node.js 10.x 或更高版本,引入了 pipeline
来替换 pump
。这是一个模块方法,用于在流之间传递错误,并正确清理并在管道完成后提供回调。
以下是使用 pipeline 的示例:
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// 使用 pipeline API 可以轻松地将一系列流连接在一起,并在管道完全完成时收到通知。
// 一个高效地压缩潜在巨大视频文件的管道:
pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
err => {
if (err) {
console.error('Pipeline failed', err)
} else {
console.log('Pipeline succeeded')
}
}
)
你也可以使用 stream/promises
模块来使用带有 async / await
的 pipeline:
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
try {
await pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz')
)
console.log('Pipeline succeeded')
} catch (err) {
console.error('Pipeline failed', err)
}
}
数据过多,速度过快
某些情况下,Readable
流可能会向 Writable
流提供数据速度过快——远超消费者的处理能力!
发生这种情况时,消费者将开始排队等待所有数据块以便稍后处理。写入队列将越来越长,因此必须在内存中保留更多数据,直到整个过程完成。
写入磁盘比读取磁盘慢得多,因此,当我们尝试压缩文件并将其写入硬盘时,由于写入磁盘无法跟上读取速度,就会发生背压。
// 流暗中表示:“等等,太快了!”
// 当写入尝试跟上传入的数据流时,数据将开始在数据缓冲区的读取端累积。
inp.pipe(gzip).pipe(outputFile)
这就是背压机制很重要的原因。如果没有背压系统,该过程将耗尽系统内存,有效地减慢其他进程的速度,并独占系统的大部分资源,直到完成。
这会导致以下几种情况:
- 减慢所有其他当前进程的速度
- 垃圾回收器超负荷工作
- 内存耗尽
在以下示例中,我们将取出 .write()
函数的返回值并将其更改为 true
,这实际上禁用了 Node.js 核心中的背压支持。在任何对 'modified'
二进制文件的引用中,我们指的是运行不包含 return ret;
行的 node 二进制文件,而是替换为 return true;
的二进制文件。
垃圾回收的过度拖累
让我们来看一个简单的基准测试。使用上面相同的例子,我们进行了几次计时试验,以获得两个二进制文件的中间时间。
试验编号 | `node` 二进制文件 (ms) | 修改后的 `node` 二进制文件 (ms)
=================================================================
1 | 56924 | 55011
2 | 52686 | 55869
3 | 59479 | 54043
4 | 54473 | 55229
5 | 52933 | 59723
=================================================================
平均时间: | 55299 | 55975
两者运行时间大约一分钟,所以几乎没有区别,但是让我们仔细看看以确认我们的怀疑是否正确。我们使用 Linux 工具 dtrace
来评估 V8 垃圾回收器发生了什么。
GC(垃圾回收器)测量的时段表示垃圾回收器完成单次扫描的完整周期的间隔:
大约时间 (ms) | GC (ms) | 修改后的 GC (ms)
=================================================
0 | 0 | 0
1 | 0 | 0
40 | 0 | 2
170 | 3 | 1
300 | 3 | 1
* * *
* * *
* * *
39000 | 6 | 26
42000 | 6 | 21
47000 | 5 | 32
50000 | 8 | 28
54000 | 6 | 35
虽然这两个进程开始时相同,并且似乎以相同的速率工作 GC,但很明显,在安装了正常工作的背压系统后几秒钟,它将 GC 负载分散到 4-8 毫秒的一致间隔内,直到数据传输结束。
但是,如果没有背压系统,V8 垃圾回收就会开始拖延。正常的二进制文件调用 GC 在一分钟内大约触发 75 次,而修改后的二进制文件只触发 36 次。
这是由于内存使用量增长而逐渐累积的缓慢债务。在没有背压系统的情况下,随着数据的传输,每次块传输都会使用更多的内存。
分配的内存越多,GC 在一次扫描中就需要处理得越多。扫描越大,GC 就越需要决定什么可以释放,并且在更大的内存空间中扫描分离的指针将消耗更多的计算能力。
内存耗尽
为了确定每个二进制文件的内存消耗,我们使用 /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js
分别对每个进程进行了计时。
这是普通二进制文件的输出:
Respecting the return value of .write()
=============================================
real 58.88
user 56.79
sys 8.79
87810048 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
19427 page reclaims
3134 page faults
0 swaps
5 block input operations
194 block output operations
0 messages sent
0 messages received
1 signals received
12 voluntary context switches
666037 involuntary context switches
虚拟内存占用的最大字节大小约为 87.81 MB。
现在更改 .write()
函数的返回值,我们得到:
Without respecting the return value of .write():
==================================================
real 54.48
user 53.15
sys 7.43
1524965376 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
373617 page reclaims
3139 page faults
0 swaps
18 block input operations
199 block output operations
0 messages sent
0 messages received
1 signals received
25 voluntary context switches
629566 involuntary context switches
虚拟内存占用的最大字节大小约为 1.52 GB。
如果没有就绪的流来委派背压,则分配的内存空间数量级更大——相同进程之间存在巨大的差异!
这个实验展示了 Node.js 背压机制对于您的计算系统的优化和成本效益。现在,让我们分解一下它的工作原理!
反压如何解决这些问题?
有多种函数可以将数据从一个进程传输到另一个进程。在 Node.js 中,有一个内置的内部函数叫做 .pipe()
。当然,你也可以使用其他包!但最终,在这个过程的基本层面,我们有两个独立的组件:数据源和消费者。
当从源调用 .pipe()
时,它会向消费者发出信号,表明有数据要传输。pipe 函数有助于为事件触发器设置适当的反压闭包。
在 Node.js 中,源是 Readable
流,消费者是 Writable
流(这两个都可以与 Duplex 或 Transform 流互换,但这超出了本指南的范围)。
反压触发的时刻可以精确地缩小到 Writable
的 .write()
函数的返回值。当然,这个返回值是由一些条件决定的。
在任何数据缓冲区超过 highwaterMark
或写入队列当前繁忙的情况下,.write()
将 返回 false
。
当返回 false
值时,反压系统启动。它将暂停传入的 Readable
流发送任何数据,并等待消费者再次准备好。一旦数据缓冲区清空,将发出 'drain'
事件并恢复传入的数据流。
队列完成后,反压将允许再次发送数据。正在使用的内存空间将释放自身并准备下一批数据。
这有效地允许在任何给定时间为 .pipe()
函数使用固定数量的内存。不会出现内存泄漏,也不会出现无限缓冲,垃圾收集器只需要处理内存中的一个区域!
所以,如果反压如此重要,为什么你(可能)没有听说过它呢?答案很简单:Node.js 自动为你完成所有这些工作。
这太好了!但当我们试图理解如何实现自定义流时,这也不是那么好。
注意
在大多数机器中,有一个字节大小决定缓冲区何时已满(这在不同的机器上会有所不同)。Node.js 允许你设置自定义的 highWaterMark
,但通常情况下,默认设置为 16kb(16384,或 objectMode 流的 16)。如果你想提高这个值,尽管去做,但要谨慎!
.pipe()
的生命周期
为了更好地理解背压,这里有一个关于 Readable
流被 管道化 到 Writable
流的生命周期流程图:
+===================+
x--> 管道函数 +--> src.pipe(dest) |
x 在 .pipe 方法中 |===================|
x 设置。 | 事件回调 |
+===============+ x |-------------------|
| 你的数据 | x 它们存在于数据流 | .on('close', cb) |
+=======+=======+ x 之外,但 | .on('data', cb) |
| x 重要的是附加了 | .on('drain', cb) |
| x 事件及其 | .on('unpipe', cb) |
+---------v---------+ x 相应的回调函数。 | .on('error', cb) |
| Readable 流 +----+ | .on('finish', cb) |
+-^-------^-------^-+ | | .on('end', cb) |
^ | ^ | +-------------------+
| | | |
| ^ | |
^ ^ ^ | +-------------------+ +=================+
^ | ^ +----> Writable 流 +---------> .write(chunk) |
| | | +-------------------+ +=======+=========+
| | | |
| ^ | +------------------v---------+
^ | +-> if (!chunk) | 这个块太大了吗? |
^ | | emit .end(); | 队列繁忙吗? |
| | +-> else +-------+----------------+---+
| ^ | emit .write(); | |
| ^ ^ +--v---+ +---v---+
| | ^-----------------------------------< 否 | | 是 |
^ | +------+ +---v---+
^ | |
| ^ emit .pause(); +=================+ |
| ^---------------^-----------------------+ 返回 false; <-----+---+
| +=================+ |
| |
^ 当队列为空时 +============+ |
^------------^-----------------------< 缓冲 | |
| |============| |
+> emit .drain(); | ^缓冲区^ | |
+> emit .resume(); +------------+ |
| ^缓冲区^ | |
+------------+ 将块添加到队列 |
| <---^---------------------<
+============+
注意
如果你正在设置一个管道来连接一些流来操作你的数据,你很可能会实现 Transform 流。
在这种情况下,你的 Readable
流的输出将进入 Transform
并管道到 Writable
。
Readable.pipe(Transformable).pipe(Writable)
背压将自动应用,但请注意,Transform
流的传入和传出的 highwaterMark
都可以被操作,并将影响背压系统。
背压指南
从 Node.js v0.10 开始,Stream 类就提供了使用这些函数的下划线版本(._read()
和 ._write()
)来修改 .read()
或 .write()
行为的能力。
已经有关于实现 Readable 流和实现 Writable 流的文档指南。我们假设您已经阅读过这些指南,下一节将更深入地探讨。
实现自定义流时需遵守的规则
流的黄金法则始终是尊重背压。最佳实践的构成是不矛盾的实践。只要您小心避免与内部背压支持冲突的行为,您就可以确信自己遵循了良好的实践。
一般来说,
- 如果没有被请求,就不要
.push()
。 .write()
返回 false 后不要调用它,而是等待 'drain' 事件。- 不同 Node.js 版本和您使用的库之间,流会有所变化。要小心并进行测试。
注意
关于第 3 点,一个非常有用的构建浏览器流的包是 readable-stream
。Rodd Vagg 撰写了一篇精彩的博文 描述了这个库的实用性。简而言之,它为 Readable 流提供了一种自动的优雅降级,并支持旧版本的浏览器和 Node.js。
可读流的特定规则
到目前为止,我们已经了解了 .write()
如何影响背压,并且主要关注了可写流。由于 Node.js 的功能,数据在技术上是从可读流向下游流向可写流的。但是,正如我们在任何数据、物质或能量传输中观察到的那样,源与目标同样重要,而可读流对于如何处理背压至关重要。
这两个进程依赖于彼此有效地进行通信,如果可读流忽略了可写流要求它停止发送数据的请求,这就像 .write()
的返回值不正确一样成问题。
因此,除了尊重 .write()
的返回值外,我们还必须尊重 ._read()
方法中使用的 .push()
的返回值。如果 .push()
返回假值,则流将停止从源读取。否则,它将继续运行而不会暂停。
这是一个使用 .push()
的不良实践示例:
// 这很成问题,因为它完全忽略了 push 的返回值,
// 这可能是来自目标流的背压信号!
class MyReadable extends Readable {
_read(size) {
let chunk
while (null == (chunk = getNextChunk())) {
this.push(chunk)
}
}
}
此外,在自定义流之外,忽略背压也存在陷阱。在这个良好实践的反例中,应用程序的代码强制在数据可用时(由 'data'
事件发出信号)推送数据:
// 这忽略了 Node.js 已设置的背压机制,
// 并无条件地推送数据,而不管目标流是否已准备好接收它。
readable.on('data', data => writable.write(data))
这是一个使用 .push()
和可读流的示例。
const { Readable } = require('node:stream')
// 创建一个自定义可读流
const myReadableStream = new Readable({
objectMode: true,
read(size) {
// 将一些数据推送到流中
this.push({ message: 'Hello, world!' })
this.push(null) // 标记流的结尾
},
})
// 使用流
myReadableStream.on('data', chunk => {
console.log(chunk)
})
// 输出:
// { message: 'Hello, world!' }
可写流的特定规则
回想一下,.write()
方法可能会根据某些条件返回 true 或 false。幸运的是,当我们构建自己的可写流时,流状态机将处理我们的回调,并确定何时处理背压并为我们优化数据流。但是,当我们想要直接使用可写流时,必须遵守 .write()
的返回值并密切注意这些条件:
- 如果写入队列繁忙,
.write()
将返回 false。 - 如果数据块太大,
.write()
将返回 false(限制由变量highWaterMark
指示)。
在这个例子中,我们创建一个自定义的可读流,使用 .push()
将单个对象推送到流中。当流准备好消费数据时,会调用 ._read()
方法,在这种情况下,我们立即将一些数据推送到流中,并通过推送 null
来标记流的结束。
const stream = require('stream')
class MyReadable extends stream.Readable {
constructor() {
super()
}
_read() {
const data = { message: 'Hello, world!' }
this.push(data)
this.push(null)
}
}
const readableStream = new MyReadable()
readableStream.pipe(process.stdout)
然后,我们通过监听 'data' 事件并记录推送到流中的每个数据块来使用流。在这种情况下,我们只向流中推送单个数据块,因此我们只看到一条日志消息。
可写流的特定规则
回想一下,.write()
方法可能会根据某些条件返回 true 或 false。幸运的是,当我们构建自己的可写流时,流状态机将处理我们的回调,并确定何时处理背压并为我们优化数据流。
但是,当我们想直接使用可写流时,必须遵守 .write()
的返回值并密切注意这些条件:
- 如果写入队列繁忙,
.write()
将返回 false。 - 如果数据块太大,
.write()
将返回 false(限制由变量 highWaterMark 指示)。
class MyWritable extends Writable {
// 由于 JavaScript 回调的异步特性,此可写流无效。
// 如果在最后一个回调之前没有为每个回调返回语句,
// 则很有可能调用多个回调。
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) callback()
else if (chunk.toString().indexOf('b') >= 0) callback()
callback()
}
}
在实现 ._writev()
时,也有一些需要注意的地方。该函数与 .cork()
耦合,但在编写时有一个常见的错误:
// 在这里两次使用 .uncork() 会在 C++ 层进行两次调用,从而使 cork/uncork 技术失效。
ws.cork()
ws.write('hello ')
ws.write('world ')
ws.uncork()
ws.cork()
ws.write('from ')
ws.write('Matteo')
ws.uncork()
// 正确的写法是利用 process.nextTick(),它在下一个事件循环中触发。
ws.cork()
ws.write('hello ')
ws.write('world ')
process.nextTick(doUncork, ws)
ws.cork()
ws.write('from ')
ws.write('Matteo')
process.nextTick(doUncork, ws)
// 作为全局函数。
function doUncork(stream) {
stream.uncork()
}
.cork()
可以调用任意多次,我们只需要小心调用相同次数的 .uncork()
来使其再次流动。
结论
流是 Node.js 中经常使用的模块。它们对于内部结构很重要,对于开发者来说,可以扩展和连接 Node.js 模块生态系统。
希望你现在能够考虑到背压,安全地编写你自己的 Writable
和 Readable
流并进行故障排除,并与同事和朋友分享你的知识。
务必阅读更多关于 Stream
的内容,了解其他 API 函数,以帮助改进和释放你在使用 Node.js 构建应用程序时的流处理能力。