Skip to content

流处理中的背压

数据处理过程中存在一个普遍问题,称为背压,它描述了数据传输过程中缓冲区后面数据累积的情况。当传输的接收端执行复杂操作或由于任何原因速度较慢时,来自传入源的数据往往会累积,就像堵塞一样。

为了解决这个问题,必须有一个委托系统来确保数据从一个源到另一个源的平滑流动。不同的社区已经根据其程序独特地解决了这个问题,Unix 管道和 TCP 套接字就是很好的例子,通常被称为流量控制。在 Node.js 中,流已被采纳为解决方案。

本指南的目的是进一步详细说明背压是什么,以及流在 Node.js 源代码中是如何解决这个问题的。指南的第二部分将介绍一些建议的最佳实践,以确保你的应用程序代码在实现流时安全且经过优化。

我们假设你对 Node.js 中backpressureBufferEventEmitters的通用定义以及一些Stream的经验略知一二。如果你还没有阅读这些文档,最好先查看一下API 文档,因为它将有助于你在阅读本指南时扩展你的理解。

数据处理的问题

在计算机系统中,数据通过管道、套接字和信号在不同的进程之间传输。在 Node.js 中,我们发现了类似的机制,称为 Stream(流)。流很棒!它们为 Node.js 做了很多事情,内部代码库的几乎每个部分都使用了这个模块。作为开发者,也强烈建议你使用它们!

javascript
const readline = require('node:readline')

const rl = readline.createInterface({
  output: process.stdout,
  input: process.stdin,
})

rl.question('为什么要使用流? ', answer => {
  console.log(`也许是因为 ${answer},也许是因为它们很棒!`)
})

rl.close()

通过流实现的反压机制是一个很好的优化,一个很好的例子可以通过比较 Node.js 流实现的内部系统工具来演示。

在一个场景中,我们将获取一个大型文件(大约 -9 GB)并使用熟悉的 zip(1) 工具对其进行压缩。

bash
zip The.Matrix.1080p.mkv

虽然这需要几分钟才能完成,但在另一个 shell 中,我们可以运行一个脚本,该脚本使用 Node.js 的模块 zlib,它围绕另一个压缩工具 gzip(1) 进行包装。

javascript
const gzip = require('node:zlib').createGzip()
const fs = require('node:fs')

const inp = fs.createReadStream('The.Matrix.1080p.mkv')
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz')

inp.pipe(gzip).pipe(out)

要测试结果,请尝试打开每个压缩文件。zip(1) 工具压缩的文件会通知你文件已损坏,而 Stream 完成的压缩将无错误地解压缩。

注意

在这个例子中,我们使用 .pipe() 将数据源从一端传递到另一端。但是,请注意没有附加适当的错误处理程序。如果一部分数据未能正确接收,则 Readable 源或 gzip 流将不会被销毁。pump 是一个实用工具,如果管道中的一个流失败或关闭,它将正确地销毁管道中的所有流,在这种情况下是必须的!

pump 仅适用于 Node.js 8.x 或更早版本,对于 Node.js 10.x 或更高版本,引入了 pipeline 来替换 pump。这是一个模块方法,用于在流之间传递错误,并正确清理并在管道完成后提供回调。

以下是使用 pipeline 的示例:

javascript
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// 使用 pipeline API 可以轻松地将一系列流连接在一起,并在管道完全完成时收到通知。
// 一个高效地压缩潜在巨大视频文件的管道:
pipeline(
  fs.createReadStream('The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  err => {
    if (err) {
      console.error('Pipeline failed', err)
    } else {
      console.log('Pipeline succeeded')
    }
  }
)

你也可以使用 stream/promises 模块来使用带有 async / await 的 pipeline:

javascript
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
  try {
    await pipeline(
      fs.createReadStream('The.Matrix.1080p.mkv'),
      zlib.createGzip(),
      fs.createWriteStream('The.Matrix.1080p.mkv.gz')
    )
    console.log('Pipeline succeeded')
  } catch (err) {
    console.error('Pipeline failed', err)
  }
}

数据过多,速度过快

某些情况下,Readable 流可能会向 Writable 流提供数据速度过快——远超消费者的处理能力!

发生这种情况时,消费者将开始排队等待所有数据块以便稍后处理。写入队列将越来越长,因此必须在内存中保留更多数据,直到整个过程完成。

写入磁盘比读取磁盘慢得多,因此,当我们尝试压缩文件并将其写入硬盘时,由于写入磁盘无法跟上读取速度,就会发生背压。

javascript
// 流暗中表示:“等等,太快了!”
// 当写入尝试跟上传入的数据流时,数据将开始在数据缓冲区的读取端累积。
inp.pipe(gzip).pipe(outputFile)

这就是背压机制很重要的原因。如果没有背压系统,该过程将耗尽系统内存,有效地减慢其他进程的速度,并独占系统的大部分资源,直到完成。

这会导致以下几种情况:

  • 减慢所有其他当前进程的速度
  • 垃圾回收器超负荷工作
  • 内存耗尽

在以下示例中,我们将取出 .write() 函数的返回值并将其更改为 true,这实际上禁用了 Node.js 核心中的背压支持。在任何对 'modified' 二进制文件的引用中,我们指的是运行不包含 return ret; 行的 node 二进制文件,而是替换为 return true; 的二进制文件。

垃圾回收的过度拖累

让我们来看一个简单的基准测试。使用上面相同的例子,我们进行了几次计时试验,以获得两个二进制文件的中间时间。

bash
   试验编号     | `node` 二进制文件 (ms) | 修改后的 `node` 二进制文件 (ms)
=================================================================
      1       |      56924         |           55011
      2       |      52686         |           55869
      3       |      59479         |           54043
      4       |      54473         |           55229
      5       |      52933         |           59723
=================================================================
平均时间:     |      55299         |           55975

两者运行时间大约一分钟,所以几乎没有区别,但是让我们仔细看看以确认我们的怀疑是否正确。我们使用 Linux 工具 dtrace 来评估 V8 垃圾回收器发生了什么。

GC(垃圾回收器)测量的时段表示垃圾回收器完成单次扫描的完整周期的间隔:

bash
大约时间 (ms) | GC (ms) | 修改后的 GC (ms)
=================================================
          0       |    0    |      0
          1       |    0    |      0
         40       |    0    |      2
        170       |    3    |      1
        300       |    3    |      1
         *             *           *
         *             *           *
         *             *           *
      39000       |    6    |     26
      42000       |    6    |     21
      47000       |    5    |     32
      50000       |    8    |     28
      54000       |    6    |     35

虽然这两个进程开始时相同,并且似乎以相同的速率工作 GC,但很明显,在安装了正常工作的背压系统后几秒钟,它将 GC 负载分散到 4-8 毫秒的一致间隔内,直到数据传输结束。

但是,如果没有背压系统,V8 垃圾回收就会开始拖延。正常的二进制文件调用 GC 在一分钟内大约触发 75 次,而修改后的二进制文件只触发 36 次。

这是由于内存使用量增长而逐渐累积的缓慢债务。在没有背压系统的情况下,随着数据的传输,每次块传输都会使用更多的内存。

分配的内存越多,GC 在一次扫描中就需要处理得越多。扫描越大,GC 就越需要决定什么可以释放,并且在更大的内存空间中扫描分离的指针将消耗更多的计算能力。

内存耗尽

为了确定每个二进制文件的内存消耗,我们使用 /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js 分别对每个进程进行了计时。

这是普通二进制文件的输出:

bash
Respecting the return value of .write()
=============================================
real        58.88
user        56.79
sys          8.79
  87810048  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
     19427  page reclaims
      3134  page faults
         0  swaps
         5  block input operations
       194  block output operations
         0  messages sent
         0  messages received
         1  signals received
        12  voluntary context switches
    666037  involuntary context switches

虚拟内存占用的最大字节大小约为 87.81 MB。

现在更改 .write() 函数的返回值,我们得到:

bash
Without respecting the return value of .write():
==================================================
real        54.48
user        53.15
sys          7.43
1524965376  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
    373617  page reclaims
      3139  page faults
         0  swaps
        18  block input operations
       199  block output operations
         0  messages sent
         0  messages received
         1  signals received
        25  voluntary context switches
    629566  involuntary context switches

虚拟内存占用的最大字节大小约为 1.52 GB。

如果没有就绪的流来委派背压,则分配的内存空间数量级更大——相同进程之间存在巨大的差异!

这个实验展示了 Node.js 背压机制对于您的计算系统的优化和成本效益。现在,让我们分解一下它的工作原理!

反压如何解决这些问题?

有多种函数可以将数据从一个进程传输到另一个进程。在 Node.js 中,有一个内置的内部函数叫做 .pipe()。当然,你也可以使用其他包!但最终,在这个过程的基本层面,我们有两个独立的组件:数据源和消费者。

当从源调用 .pipe() 时,它会向消费者发出信号,表明有数据要传输。pipe 函数有助于为事件触发器设置适当的反压闭包。

在 Node.js 中,源是 Readable 流,消费者是 Writable 流(这两个都可以与 Duplex 或 Transform 流互换,但这超出了本指南的范围)。

反压触发的时刻可以精确地缩小到 Writable.write() 函数的返回值。当然,这个返回值是由一些条件决定的。

在任何数据缓冲区超过 highwaterMark 或写入队列当前繁忙的情况下,.write()返回 false

当返回 false 值时,反压系统启动。它将暂停传入的 Readable 流发送任何数据,并等待消费者再次准备好。一旦数据缓冲区清空,将发出 'drain' 事件并恢复传入的数据流。

队列完成后,反压将允许再次发送数据。正在使用的内存空间将释放自身并准备下一批数据。

这有效地允许在任何给定时间为 .pipe() 函数使用固定数量的内存。不会出现内存泄漏,也不会出现无限缓冲,垃圾收集器只需要处理内存中的一个区域!

所以,如果反压如此重要,为什么你(可能)没有听说过它呢?答案很简单:Node.js 自动为你完成所有这些工作。

这太好了!但当我们试图理解如何实现自定义流时,这也不是那么好。

注意

在大多数机器中,有一个字节大小决定缓冲区何时已满(这在不同的机器上会有所不同)。Node.js 允许你设置自定义的 highWaterMark,但通常情况下,默认设置为 16kb(16384,或 objectMode 流的 16)。如果你想提高这个值,尽管去做,但要谨慎!

.pipe() 的生命周期

为了更好地理解背压,这里有一个关于 Readable 流被 管道化Writable 流的生命周期流程图:

bash
                                                     +===================+
                         x-->  管道函数           +-->   src.pipe(dest)  |
                         x .pipe 方法中     |===================|
                         x     设置。             |  事件回调      |
  +===============+      x                           |-------------------|
  |   你的数据   |      x     它们存在于数据流     | .on('close', cb)  |
  +=======+=======+      x     之外,但           | .on('data', cb)   |
          |              x     重要的是附加了     | .on('drain', cb)  |
          |              x     事件及其           | .on('unpipe', cb) |
+---------v---------+    x     相应的回调函数。 | .on('error', cb)  |
|  Readable  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Writable  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    这个块太大了吗?  |
  ^       |       |     emit .end();             |    队列繁忙吗?      |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emit .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<    |        |  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               emit .pause();          +=================+     |
  |       ^---------------^-----------------------+  返回 false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            当队列为空时     +============+                         |
  ^------------^-----------------------<  缓冲     |                         |
               |                       |============|                         |
               +> emit .drain();       |  ^缓冲区^  |                         |
               +> emit .resume();      +------------+                         |
                                       |  ^缓冲区^  |                         |
                                       +------------+   将块添加到队列    |
                                       |            <---^---------------------<
                                       +============+

注意

如果你正在设置一个管道来连接一些流来操作你的数据,你很可能会实现 Transform 流。

在这种情况下,你的 Readable 流的输出将进入 Transform 并管道到 Writable

javascript
Readable.pipe(Transformable).pipe(Writable)

背压将自动应用,但请注意,Transform 流的传入和传出的 highwaterMark 都可以被操作,并将影响背压系统。

背压指南

从 Node.js v0.10 开始,Stream 类就提供了使用这些函数的下划线版本(._read()._write())来修改 .read().write() 行为的能力。

已经有关于实现 Readable 流和实现 Writable 流的文档指南。我们假设您已经阅读过这些指南,下一节将更深入地探讨。

实现自定义流时需遵守的规则

流的黄金法则始终是尊重背压。最佳实践的构成是不矛盾的实践。只要您小心避免与内部背压支持冲突的行为,您就可以确信自己遵循了良好的实践。

一般来说,

  1. 如果没有被请求,就不要 .push()
  2. .write() 返回 false 后不要调用它,而是等待 'drain' 事件。
  3. 不同 Node.js 版本和您使用的库之间,流会有所变化。要小心并进行测试。

注意

关于第 3 点,一个非常有用的构建浏览器流的包是 readable-stream。Rodd Vagg 撰写了一篇精彩的博文 描述了这个库的实用性。简而言之,它为 Readable 流提供了一种自动的优雅降级,并支持旧版本的浏览器和 Node.js。

可读流的特定规则

到目前为止,我们已经了解了 .write() 如何影响背压,并且主要关注了可写流。由于 Node.js 的功能,数据在技术上是从可读流向下游流向可写流的。但是,正如我们在任何数据、物质或能量传输中观察到的那样,源与目标同样重要,而可读流对于如何处理背压至关重要。

这两个进程依赖于彼此有效地进行通信,如果可读流忽略了可写流要求它停止发送数据的请求,这就像 .write() 的返回值不正确一样成问题。

因此,除了尊重 .write() 的返回值外,我们还必须尊重 ._read() 方法中使用的 .push() 的返回值。如果 .push() 返回假值,则流将停止从源读取。否则,它将继续运行而不会暂停。

这是一个使用 .push() 的不良实践示例:

javascript
// 这很成问题,因为它完全忽略了 push 的返回值,
// 这可能是来自目标流的背压信号!
class MyReadable extends Readable {
  _read(size) {
    let chunk
    while (null == (chunk = getNextChunk())) {
      this.push(chunk)
    }
  }
}

此外,在自定义流之外,忽略背压也存在陷阱。在这个良好实践的反例中,应用程序的代码强制在数据可用时(由 'data' 事件发出信号)推送数据:

javascript
// 这忽略了 Node.js 已设置的背压机制,
// 并无条件地推送数据,而不管目标流是否已准备好接收它。
readable.on('data', data => writable.write(data))

这是一个使用 .push() 和可读流的示例。

javascript
const { Readable } = require('node:stream')

// 创建一个自定义可读流
const myReadableStream = new Readable({
  objectMode: true,
  read(size) {
    // 将一些数据推送到流中
    this.push({ message: 'Hello, world!' })
    this.push(null) // 标记流的结尾
  },
})

// 使用流
myReadableStream.on('data', chunk => {
  console.log(chunk)
})

// 输出:
// { message: 'Hello, world!' }

可写流的特定规则

回想一下,.write() 方法可能会根据某些条件返回 true 或 false。幸运的是,当我们构建自己的可写流时,流状态机将处理我们的回调,并确定何时处理背压并为我们优化数据流。但是,当我们想要直接使用可写流时,必须遵守 .write() 的返回值并密切注意这些条件:

  • 如果写入队列繁忙,.write() 将返回 false。
  • 如果数据块太大,.write() 将返回 false(限制由变量 highWaterMark 指示)。

在这个例子中,我们创建一个自定义的可读流,使用 .push() 将单个对象推送到流中。当流准备好消费数据时,会调用 ._read() 方法,在这种情况下,我们立即将一些数据推送到流中,并通过推送 null 来标记流的结束。

javascript
const stream = require('stream')

class MyReadable extends stream.Readable {
  constructor() {
    super()
  }

  _read() {
    const data = { message: 'Hello, world!' }
    this.push(data)
    this.push(null)
  }
}

const readableStream = new MyReadable()

readableStream.pipe(process.stdout)

然后,我们通过监听 'data' 事件并记录推送到流中的每个数据块来使用流。在这种情况下,我们只向流中推送单个数据块,因此我们只看到一条日志消息。

可写流的特定规则

回想一下,.write() 方法可能会根据某些条件返回 true 或 false。幸运的是,当我们构建自己的可写流时,流状态机将处理我们的回调,并确定何时处理背压并为我们优化数据流。

但是,当我们想直接使用可写流时,必须遵守 .write() 的返回值并密切注意这些条件:

  • 如果写入队列繁忙,.write() 将返回 false。
  • 如果数据块太大,.write() 将返回 false(限制由变量 highWaterMark 指示)。
javascript
class MyWritable extends Writable {
  // 由于 JavaScript 回调的异步特性,此可写流无效。
  // 如果在最后一个回调之前没有为每个回调返回语句,
  // 则很有可能调用多个回调。
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) callback()
    else if (chunk.toString().indexOf('b') >= 0) callback()
    callback()
  }
}

在实现 ._writev() 时,也有一些需要注意的地方。该函数与 .cork() 耦合,但在编写时有一个常见的错误:

javascript
// 在这里两次使用 .uncork() 会在 C++ 层进行两次调用,从而使 cork/uncork 技术失效。
ws.cork()
ws.write('hello ')
ws.write('world ')
ws.uncork()

ws.cork()
ws.write('from ')
ws.write('Matteo')
ws.uncork()

// 正确的写法是利用 process.nextTick(),它在下一个事件循环中触发。
ws.cork()
ws.write('hello ')
ws.write('world ')
process.nextTick(doUncork, ws)

ws.cork()
ws.write('from ')
ws.write('Matteo')
process.nextTick(doUncork, ws)

// 作为全局函数。
function doUncork(stream) {
  stream.uncork()
}

.cork() 可以调用任意多次,我们只需要小心调用相同次数的 .uncork() 来使其再次流动。

结论

流是 Node.js 中经常使用的模块。它们对于内部结构很重要,对于开发者来说,可以扩展和连接 Node.js 模块生态系统。

希望你现在能够考虑到背压,安全地编写你自己的 WritableReadable 流并进行故障排除,并与同事和朋友分享你的知识。

务必阅读更多关于 Stream 的内容,了解其他 API 函数,以帮助改进和释放你在使用 Node.js 构建应用程序时的流处理能力。