Skip to content

Противодавление в потоках

Существует общая проблема, возникающая при обработке данных, которая называется противодавлением (backpressure) и описывает накопление данных за буфером во время передачи данных. Когда принимающий конец передачи выполняет сложные операции или по какой-либо причине работает медленнее, данные из входящего источника имеют тенденцию накапливаться, подобно засору.

Для решения этой проблемы должна быть реализована система делегирования, обеспечивающая плавный поток данных из одного источника в другой. Различные сообщества решили эту проблему уникальным для своих программ способом. Хорошими примерами являются каналы Unix и сокеты TCP, которые часто упоминаются как управление потоком. В Node.js решением стали потоки.

Цель этого руководства — более подробно описать, что такое противодавление и как именно потоки решают эту проблему в исходном коде Node.js. Во второй части руководства будут представлены рекомендуемые лучшие практики, обеспечивающие безопасность и оптимизацию кода вашего приложения при реализации потоков.

Мы предполагаем некоторую осведомленность с общим определением backpressure, Buffer и EventEmitters в Node.js, а также некоторый опыт работы с Stream. Если вы не читали эти документы, неплохо будет взглянуть на API-документацию, так как это поможет расширить ваше понимание при чтении этого руководства.

Проблема обработки данных

В компьютерной системе данные передаются из одного процесса в другой через каналы, сокеты и сигналы. В Node.js мы находим аналогичный механизм, называемый Stream. Потоки великолепны! Они делают так много для Node.js, и почти каждая часть внутреннего кода использует этот модуль. Как разработчик, вы более чем поощряетесь использовать их тоже!

javascript
const readline = require('node:readline')

const rl = readline.createInterface({
  output: process.stdout,
  input: process.stdin,
})

rl.question('Почему вы должны использовать потоки? ', answer => {
  console.log(`Возможно, это ${answer}, а может быть, потому что они потрясающие!`)
})

rl.close()

Хороший пример того, почему механизм противодавления, реализованный через потоки, является отличной оптимизацией, можно продемонстрировать, сравнив внутренние системные инструменты реализации потоков Node.js.

В одном сценарии мы возьмем большой файл (примерно -9 ГБ) и сжмем его с помощью знакомого инструмента zip(1).

bash
zip The.Matrix.1080p.mkv

Хотя это займет несколько минут, в другой оболочке мы можем запустить скрипт, который использует модуль Node.js zlib, который оборачивается вокруг другого инструмента сжатия, gzip(1).

javascript
const gzip = require('node:zlib').createGzip()
const fs = require('node:fs')

const inp = fs.createReadStream('The.Matrix.1080p.mkv')
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz')

inp.pipe(gzip).pipe(out)

Чтобы проверить результаты, попробуйте открыть каждый сжатый файл. Файл, сжатый инструментом zip(1), уведомит вас о том, что файл поврежден, тогда как сжатие, завершенное Stream, будет распаковано без ошибок.

Примечание

В этом примере мы используем .pipe(), чтобы получить источник данных от одного конца до другого. Однако обратите внимание, что к нему не подключены обработчики ошибок. Если какой-либо фрагмент данных не будет должным образом принят, источник Readable или поток gzip не будет уничтожен. pump — это служебная утилита, которая должным образом уничтожит все потоки в конвейере, если один из них выйдет из строя или закроется, и в этом случае она необходима!

pump необходим только для Node.js 8.x или более ранних версий, так как для Node.js 10.x или более поздних версий pipeline введен для замены pump. Это метод модуля для передачи данных между потоками, перенаправления ошибок и правильной очистки, а также предоставления обратного вызова по завершении конвейера.

Вот пример использования pipeline:

javascript
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// Используйте API pipeline для легкой передачи серии потоков
// вместе и получения уведомления о полном завершении конвейера.
// Конвейер для эффективной gzip-компрессии потенциально огромного видеофайла:
pipeline(
  fs.createReadStream('The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  err => {
    if (err) {
      console.error('Конвейер завершился с ошибкой', err)
    } else {
      console.log('Конвейер завершился успешно')
    }
  }
)

Вы также можете использовать модуль stream/promises для использования pipeline с async / await:

javascript
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
  try {
    await pipeline(
      fs.createReadStream('The.Matrix.1080p.mkv'),
      zlib.createGzip(),
      fs.createWriteStream('The.Matrix.1080p.mkv.gz')
    )
    console.log('Конвейер завершился успешно')
  } catch (err) {
    console.error('Конвейер завершился с ошибкой', err)
  }
}

Слишком много данных, слишком быстро

Существуют ситуации, когда поток Readable может передавать данные в поток Writable слишком быстро — гораздо быстрее, чем потребитель может их обработать!

В этом случае потребитель начнет накапливать все фрагменты данных для последующего использования. Очередь записи будет становиться все длиннее и длиннее, и из-за этого больше данных нужно будет хранить в памяти до завершения всего процесса.

Запись на диск происходит гораздо медленнее, чем чтение с диска, поэтому, когда мы пытаемся сжать файл и записать его на жесткий диск, возникнет противодавление, потому что диск записи не сможет справиться со скоростью чтения.

javascript
// Втайне поток говорит: "подожди, подожди! это слишком много!"
// Данные начнут накапливаться на стороне чтения буфера данных, так как
// запись пытается не отставать от поступающего потока данных.
inp.pipe(gzip).pipe(outputFile)

Вот почему важен механизм противодавления. Если бы системы противодавления не было, процесс израсходовал бы память вашей системы, эффективно замедляя другие процессы и монополизируя большую часть вашей системы до завершения.

Это приводит к нескольким последствиям:

  • Замедление всех текущих процессов
  • Очень перегруженный сборщик мусора
  • Истечение памяти

В следующих примерах мы будем использовать возвращаемое значение функции .write() и изменять его на true, что фактически отключает поддержку противодавления в ядре Node.js. В любом упоминании бинарного файла 'modified' мы говорим о запуске бинарного файла node без строки return ret;, а вместо нее используется return true;.

Чрезмерная нагрузка на сборщик мусора

Давайте взглянем на быстрый бенчмарк. Используя тот же пример, что и выше, мы провели несколько тестовых запусков, чтобы получить медианное время для обоих бинарных файлов.

bash
   прогон (#)  | бинарный файл `node` (мс) | модифицированный бинарный файл `node` (мс)
=================================================================
      1       |      56924         |           55011
      2       |      52686         |           55869
      3       |      59479         |           54043
      4       |      54473         |           55229
      5       |      52933         |           59723
=================================================================
среднее время: |      55299         |           55975

Оба занимают около минуты, поэтому особой разницы нет, но давайте посмотрим поближе, чтобы подтвердить, верны ли наши подозрения. Мы используем утилиту Linux dtrace для оценки того, что происходит со сборщиком мусора V8.

Время работы сборщика мусора (GC) указывает интервалы полного цикла одной очистки, выполненной сборщиком мусора:

bash
прибл. время (мс) | GC (мс) | модифицированный GC (мс)
=================================================
          0       |    0    |      0
          1       |    0    |      0
         40       |    0    |      2
        170       |    3    |      1
        300       |    3    |      1
         *             *           *
         *             *           *
         *             *           *
      39000       |    6    |     26
      42000       |    6    |     21
      47000       |    5    |     32
      50000       |    8    |     28
      54000       |    6    |     35

Хотя два процесса начинаются одинаково и, кажется, работают с GC с одинаковой скоростью, становится очевидным, что через несколько секунд при правильно работающей системе противодавления она распределяет нагрузку GC по последовательным интервалам 4-8 миллисекунд до конца передачи данных.

Однако, когда система противодавления отсутствует, сбор мусора V8 начинает тормозить. Обычный бинарный файл вызывал GC примерно 75 раз в минуту, тогда как модифицированный бинарный файл вызывал его только 36 раз.

Это медленный и постепенный рост долга, накапливающийся из-за растущего использования памяти. По мере передачи данных без системы противодавления для каждой передачи фрагмента используется больше памяти.

Чем больше памяти выделяется, тем большему объему работы должен уделить внимание GC за одну очистку. Чем больше очистка, тем большему объему работы должен уделить внимание GC для определения того, что можно освободить, и сканирование отсоединенных указателей в большем пространстве памяти будет потреблять больше вычислительной мощности.

Истечение памяти

Для определения потребления памяти каждым двоичным файлом мы зафиксировали каждый процесс с помощью /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js по отдельности.

Вот вывод для обычного двоичного файла:

bash
Respecting the return value of .write()
=============================================
real        58.88
user        56.79
sys          8.79
  87810048  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
     19427  page reclaims
      3134  page faults
         0  swaps
         5  block input operations
       194  block output operations
         0  messages sent
         0  messages received
         1  signals received
        12  voluntary context switches
    666037  involuntary context switches

Максимальный размер в байтах, занимаемый виртуальной памятью, составляет приблизительно 87,81 Мб.

А теперь, изменив возвращаемое значение функции .write(), мы получаем:

bash
Without respecting the return value of .write():
==================================================
real        54.48
user        53.15
sys          7.43
1524965376  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
    373617  page reclaims
      3139  page faults
         0  swaps
        18  block input operations
       199  block output operations
         0  messages sent
         0  messages received
         1  signals received
        25  voluntary context switches
    629566  involuntary context switches

Максимальный размер в байтах, занимаемый виртуальной памятью, составляет приблизительно 1,52 Гб.

Без потоков, делегирующих противодавление, выделяется на порядок больше памяти — огромная разница между одним и тем же процессом!

Этот эксперимент показывает, насколько оптимизирован и эффективен механизм противодавления Node.js для вашей вычислительной системы. Теперь давайте разберем, как он работает!

Как Backpressure Решает Эти Проблемы?

Существуют различные функции для передачи данных из одного процесса в другой. В Node.js есть встроенная функция .pipe(). Существуют и другие пакеты, которые вы можете использовать! В конечном итоге, на базовом уровне этого процесса у нас есть два отдельных компонента: источник данных и потребитель.

Когда .pipe() вызывается из источника, он сигнализирует потребителю о наличии данных для передачи. Функция pipe помогает настроить соответствующие механизмы обратного давления для триггеров событий.

В Node.js источником является поток Readable, а потребителем — поток Writable (оба могут быть заменены потоком Duplex или Transform, но это выходит за рамки данного руководства).

Момент срабатывания обратного давления можно точно определить по возвращаемому значению функции .write() потока Writable. Это возвращаемое значение, конечно, определяется несколькими условиями.

В любом сценарии, когда буфер данных превысил highwaterMark или очередь записи занята, .write() вернет false.

Когда возвращается значение false, включается система обратного давления. Она приостанавливает входящий поток Readable от отправки любых данных и ожидает, пока потребитель снова не будет готов. После очистки буфера данных будет отправлено событие 'drain', и возобновится поток входящих данных.

После завершения очереди обратное давление позволит снова отправлять данные. Занятое место в памяти освободится и подготовится к следующей порции данных.

Это эффективно позволяет использовать фиксированный объем памяти в любой момент времени для функции .pipe(). Не будет утечки памяти и бесконечного буферизации, и сборщику мусора нужно будет обрабатывать только одну область памяти!

Итак, если обратное давление так важно, почему вы (вероятно) о нем не слышали? Ответ прост: Node.js делает все это автоматически за вас.

Это здорово! Но и не очень здорово, когда мы пытаемся понять, как реализовать собственные потоки.

ПРИМЕЧАНИЕ

В большинстве машин существует размер в байтах, определяющий, когда буфер заполнен (он будет варьироваться на разных машинах). Node.js позволяет установить собственный highWaterMark, но обычно по умолчанию устанавливается значение 16 КБ (16384 или 16 для потоков objectMode). В случаях, когда вы захотите увеличить это значение, сделайте это, но с осторожностью!

Жизненный цикл .pipe()

Для лучшего понимания противодавления, вот блок-схема жизненного цикла потока Readable, передаваемого (piped) в поток Writable:

bash
                                                     +===================+
                         x-->  Функции передачи    +-->   src.pipe(dest)  |
                         x     настраиваются во   |===================|
                         x     время метода .pipe.  |  Обработчики событий|
  +===============+      x                           |-------------------|
  |   Ваши данные  |      x     Они существуют вне    | .on('close', cb)  |
  +=======+=======+      x     потока данных, но    | .on('data', cb)   |
          |              x     важно, что они       | .on('drain', cb)  |
          |              x     прикрепляют события  | .on('unpipe', cb) |
+---------v---------+    x     и соответствующие    | .on('error', cb)  |
|  Поток Readable  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Поток Writable  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> если (!chunk)                |    Этот фрагмент слишком большой?  |
  ^       |       |     выдать .end();             |    Очередь занята?      |
  |       |       +-> иначе                       +-------+----------------+---+
  |       ^       |     выдать .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  Нет  |        |  Да  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               выдать .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            когда очередь пуста     +============+                         |
  ^------------^-----------------------<  Буферизация |                         |
               |                       |============|                         |
               +> выдать .drain();       |  ^Буфер^  |                         |
               +> выдать .resume();      +------------+                         |
                                       |  ^Буфер^  |                         |
                                       +------------+   добавить фрагмент в очередь    |
                                       |            <---^---------------------<
                                       +============+

ПРИМЕЧАНИЕ

Если вы настраиваете конвейер для объединения нескольких потоков для обработки ваших данных, вы, скорее всего, будете реализовывать поток Transform.

В этом случае ваш вывод из потока Readable поступит в Transform и будет передан в Writable.

javascript
Readable.pipe(Transformable).pipe(Writable)

Противодавление будет применено автоматически, но обратите внимание, что как входящий, так и исходящий highwaterMark потока Transform могут быть изменены, и это повлияет на систему противодавления.

Рекомендации по противодавлению

Начиная с Node.js v0.10, класс Stream предоставляет возможность изменять поведение .read() или .write(), используя версии этих функций с подчеркиванием (._read() и ._write()).

Существуют документированные рекомендации по реализации потоков Readable и Writable. Мы предположим, что вы с ними ознакомлены, и следующий раздел углубится в детали.

Правила, которых следует придерживаться при реализации пользовательских потоков

Золотое правило потоков — всегда учитывать противодавление. Лучшая практика — это непротиворечивая практика. Пока вы осторожны и избегаете поведения, конфликтующего с внутренней поддержкой противодавления, вы можете быть уверены, что следуете хорошей практике.

В общем:

  1. Никогда не вызывайте .push(), если вас об этом не просят.
  2. Никогда не вызывайте .write(), если он возвращает false, а вместо этого ждите события 'drain'.
  3. Потоки меняются между различными версиями Node.js и используемыми библиотеками. Будьте осторожны и тестируйте.

NOTE

Что касается пункта 3, невероятно полезным пакетом для построения потоков в браузере является readable-stream. Родд Вагг написал отличный пост в блоге, описывающий полезность этой библиотеки. Вкратце, она обеспечивает своего рода автоматическую грациозную деградацию для потоков Readable и поддерживает старые версии браузеров и Node.js.

Правила, специфичные для потоков Readable

До сих пор мы рассматривали, как .write() влияет на противодавление, и сосредоточились на потоке Writable. Благодаря функциональности Node.js данные технически передаются вниз по потоку от Readable к Writable. Однако, как мы можем наблюдать в любой передаче данных, вещества или энергии, источник так же важен, как и назначение, и поток Readable имеет решающее значение для обработки противодавления.

Оба этих процесса зависят друг от друга для эффективной коммуникации; если Readable игнорирует запрос потока Writable прекратить отправку данных, это может быть так же проблематично, как и некорректное значение, возвращаемое .write().

Поэтому, помимо уважения возвращаемого значения .write(), мы также должны уважать возвращаемое значение .push(), используемое в методе ._read(). Если .push() возвращает false, поток прекратит чтение из источника. В противном случае он будет продолжать работу без остановки.

Вот пример плохой практики использования .push():

javascript
// Это проблематично, так как полностью игнорируется возвращаемое значение push,
// которое может быть сигналом противодавления от целевого потока!
class MyReadable extends Readable {
  _read(size) {
    let chunk
    while (null == (chunk = getNextChunk())) {
      this.push(chunk)
    }
  }
}

Кроме того, за пределами пользовательского потока существуют подводные камни, связанные с игнорированием противодавления. В этом контрпримере хорошей практики код приложения принудительно пропускает данные всякий раз, когда они доступны (сигнализируется событием 'data'):

javascript
// Это игнорирует механизмы противодавления, установленные в Node.js,
// и безусловно передает данные, независимо от того, готов ли
// целевой поток к их принятию или нет.
readable.on('data', data => writable.write(data))

Вот пример использования .push() с потоком Readable.

javascript
const { Readable } = require('node:stream')

// Создаем пользовательский поток Readable
const myReadableStream = new Readable({
  objectMode: true,
  read(size) {
    // Передаем некоторые данные в поток
    this.push({ message: 'Hello, world!' })
    this.push(null) // Отмечаем конец потока
  },
})

// Потребляем поток
myReadableStream.on('data', chunk => {
  console.log(chunk)
})

// Вывод:
// { message: 'Hello, world!' }

Правила, специфичные для потоков Writable

Напоминаем, что .write() может возвращать true или false в зависимости от некоторых условий. К счастью для нас, при создании собственного потока Writable, машина состояний потока будет обрабатывать наши обратные вызовы и определять, когда обрабатывать противодавление и оптимизировать поток данных для нас. Однако, когда мы хотим использовать Writable напрямую, мы должны учитывать возвращаемое значение .write() и внимательно следить за этими условиями:

  • Если очередь записи занята, .write() вернет false.
  • Если фрагмент данных слишком большой, .write() вернет false (предел указывается переменной highWaterMark).

В этом примере мы создаем пользовательский поток Readable, который отправляет один объект в поток с помощью .push(). Метод ._read() вызывается, когда поток готов потреблять данные, и в этом случае мы немедленно отправляем некоторые данные в поток и помечаем конец потока, отправляя null.

javascript
const stream = require('stream')

class MyReadable extends stream.Readable {
  constructor() {
    super()
  }

  _read() {
    const data = { message: 'Hello, world!' }
    this.push(data)
    this.push(null)
  }
}

const readableStream = new MyReadable()

readableStream.pipe(process.stdout)

Затем мы потребляем поток, прослушивая событие 'data' и регистрируя каждый фрагмент данных, отправляемый в поток. В этом случае мы отправляем в поток только один фрагмент данных, поэтому мы видим только одно сообщение в журнале.

Правила, специфичные для потоков Writable

Напоминаем, что .write() может возвращать true или false в зависимости от некоторых условий. К счастью для нас, при создании собственного потока Writable, машина состояний потока будет обрабатывать наши обратные вызовы и определять, когда обрабатывать противодавление и оптимизировать поток данных для нас.

Однако, когда мы хотим использовать Writable напрямую, мы должны учитывать возвращаемое значение .write() и внимательно следить за этими условиями:

  • Если очередь записи занята, .write() вернет false.
  • Если фрагмент данных слишком большой, .write() вернет false (предел указывается переменной highWaterMark).
javascript
class MyWritable extends Writable {
  // Этот writable недействителен из-за асинхронной природы обратных вызовов JavaScript.
  // Без оператора return для каждого обратного вызова перед последним,
  // велика вероятность вызова нескольких обратных вызовов.
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) callback()
    else if (chunk.toString().indexOf('b') >= 0) callback()
    callback()
  }
}

Также есть некоторые моменты, на которые следует обратить внимание при реализации ._writev(). Функция связана с .cork(), но существует распространенная ошибка при записи:

javascript
// Двукратное использование .uncork() здесь приводит к двум вызовам на уровне C++, что делает бесполезной технику cork/uncork.
ws.cork()
ws.write('hello ')
ws.write('world ')
ws.uncork()

ws.cork()
ws.write('from ')
ws.write('Matteo')
ws.uncork()

// Правильный способ записи — использовать process.nextTick(), который срабатывает
// на следующем цикле событий.
ws.cork()
ws.write('hello ')
ws.write('world ')
process.nextTick(doUncork, ws)

ws.cork()
ws.write('from ')
ws.write('Matteo')
process.nextTick(doUncork, ws)

// Как глобальная функция.
function doUncork(stream) {
  stream.uncork()
}

.cork() можно вызывать сколько угодно раз, нам просто нужно быть осторожными, чтобы вызывать .uncork() столько же раз, чтобы снова запустить поток.

Заключение

Потоки — часто используемый модуль в Node.js. Они важны для внутренней структуры и для разработчиков, чтобы расширять и связывать экосистему модулей Node.js.

Надеюсь, теперь вы сможете устранять неполадки и безопасно писать собственные потоки Writable и Readable с учетом противодавления, а также делиться своими знаниями с коллегами и друзьями.

Обязательно ознакомьтесь с дополнительной информацией о Stream для других функций API, чтобы улучшить и раскрыть возможности потоковой передачи при создании приложения с использованием Node.js.