Skip to content

Contrapressão em Streams

Existe um problema geral que ocorre durante o tratamento de dados chamado contrapressão e descreve um acúmulo de dados atrás de um buffer durante a transferência de dados. Quando a extremidade receptora da transferência possui operações complexas ou é mais lenta por algum motivo, há uma tendência de os dados da fonte de entrada se acumularem, como um entupimento.

Para resolver esse problema, deve haver um sistema de delegação em vigor para garantir um fluxo suave de dados de uma fonte para outra. Diferentes comunidades resolveram essa questão de forma única para seus programas; pipes Unix e sockets TCP são bons exemplos disso e são frequentemente referidos como controle de fluxo. No Node.js, as streams foram a solução adotada.

O objetivo deste guia é detalhar ainda mais o que é contrapressão e como exatamente as streams abordam isso no código-fonte do Node.js. A segunda parte do guia apresentará as melhores práticas sugeridas para garantir que o código do seu aplicativo seja seguro e otimizado ao implementar streams.

Supomos um pouco de familiaridade com a definição geral de backpressure, Buffer e EventEmitters no Node.js, bem como alguma experiência com Stream. Se você não leu esses documentos, não é uma má ideia dar uma olhada na documentação da API primeiro, pois isso ajudará a expandir sua compreensão ao ler este guia.

O Problema com o Tratamento de Dados

Em um sistema de computador, os dados são transferidos de um processo para outro por meio de pipes, sockets e sinais. No Node.js, encontramos um mecanismo semelhante chamado Stream. Streams são ótimas! Elas fazem muito pelo Node.js e quase todas as partes do código-base interno utilizam esse módulo. Como desenvolvedor, você é mais do que encorajado a usá-las também!

javascript
const readline = require('node:readline')

const rl = readline.createInterface({
  output: process.stdout,
  input: process.stdin,
})

rl.question('Por que você deve usar streams? ', answer => {
  console.log(`Talvez seja ${answer}, talvez seja porque elas são incríveis!`)
})

rl.close()

Um bom exemplo de por que o mecanismo de contrapressão implementado por meio de streams é uma ótima otimização pode ser demonstrado comparando as ferramentas do sistema interno da implementação do Stream do Node.js.

Em um cenário, pegaremos um arquivo grande (aproximadamente -9 GB) e o compactaremos usando a ferramenta familiar zip(1).

bash
zip The.Matrix.1080p.mkv

Embora isso leve alguns minutos para ser concluído, em outro shell podemos executar um script que usa o módulo zlib do Node.js, que envolve outra ferramenta de compressão, gzip(1).

javascript
const gzip = require('node:zlib').createGzip()
const fs = require('node:fs')

const inp = fs.createReadStream('The.Matrix.1080p.mkv')
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz')

inp.pipe(gzip).pipe(out)

Para testar os resultados, tente abrir cada arquivo compactado. O arquivo compactado pela ferramenta zip(1) notificará que o arquivo está corrompido, enquanto a compressão finalizada pelo Stream será descompactada sem erros.

Nota

Neste exemplo, usamos .pipe() para obter a fonte de dados de uma extremidade para outra. No entanto, observe que não há manipuladores de erros apropriados anexados. Se um bloco de dados não for recebido corretamente, a fonte Readable ou a stream gzip não serão destruídas. pump é uma ferramenta utilitária que destruiria corretamente todas as streams em um pipeline se uma delas falhar ou fechar, e é essencial nesse caso!

pump só é necessário para o Node.js 8.x ou anterior, pois para o Node.js 10.x ou versão posterior, pipeline é introduzido para substituir pump. Este é um método de módulo para pipe entre streams que encaminham erros e limpam adequadamente e fornecem um callback quando o pipeline estiver completo.

Aqui está um exemplo de uso do pipeline:

javascript
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// Use a API de pipeline para facilmente pipear uma série de streams
// juntas e receber notificação quando o pipeline estiver totalmente concluído.
// Um pipeline para compactar um arquivo de vídeo potencialmente enorme de forma eficiente:
pipeline(
  fs.createReadStream('The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  err => {
    if (err) {
      console.error('Pipeline falhou', err)
    } else {
      console.log('Pipeline concluído com sucesso')
    }
  }
)

Você também pode usar o módulo stream/promises para usar o pipeline com async / await:

javascript
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
  try {
    await pipeline(
      fs.createReadStream('The.Matrix.1080p.mkv'),
      zlib.createGzip(),
      fs.createWriteStream('The.Matrix.1080p.mkv.gz')
    )
    console.log('Pipeline concluído com sucesso')
  } catch (err) {
    console.error('Pipeline falhou', err)
  }
}

Dados Demais, Muito Rapidamente

Existem instâncias em que uma stream Readable pode fornecer dados para a Writable muito rapidamente – muito mais do que o consumidor pode lidar!

Quando isso ocorre, o consumidor começará a colocar em fila todos os blocos de dados para consumo posterior. A fila de gravação ficará cada vez maior e, devido a isso, mais dados precisam ser mantidos na memória até que todo o processo tenha sido concluído.

Gravar em um disco é muito mais lento do que ler de um disco, portanto, quando estamos tentando comprimir um arquivo e gravá-lo em nosso disco rígido, ocorrerá contrapressão porque o disco de gravação não será capaz de acompanhar a velocidade da leitura.

javascript
// Secretamente, o stream está dizendo: "uau, uau! espere, isso é demais!"
// Os dados começarão a se acumular no lado de leitura do buffer de dados enquanto
// a gravação tenta acompanhar o fluxo de dados de entrada.
inp.pipe(gzip).pipe(outputFile)

É por isso que um mecanismo de contrapressão é importante. Se um sistema de contrapressão não estivesse presente, o processo consumiria a memória do seu sistema, diminuindo efetivamente outros processos e monopolizando uma grande parte do seu sistema até a conclusão.

Isso resulta em algumas coisas:

  • Lentidão de todos os outros processos atuais
  • Um coletor de lixo muito sobrecarregado
  • Esgotamento de memória

Nos exemplos a seguir, removeremos o valor retornado da função .write() e o alteraremos para true, o que desabilita efetivamente o suporte de contrapressão no núcleo do Node.js. Em qualquer referência ao binário 'modified', estamos falando em executar o binário do node sem a linha return ret;, e sim com o return true; substituído.

Excesso de Atraso na Coleta de Lixo

Vamos dar uma olhada em um benchmark rápido. Usando o mesmo exemplo acima, executamos alguns testes de tempo para obter um tempo mediano para ambos os binários.

bash
   tentativa (#)  | binário `node` (ms) | binário `node` modificado (ms)
=================================================================
      1       |      56924         |           55011
      2       |      52686         |           55869
      3       |      59479         |           54043
      4       |      54473         |           55229
      5       |      52933         |           59723
=================================================================
tempo médio: |      55299         |           55975

Ambos levam cerca de um minuto para executar, então não há muita diferença, mas vamos dar uma olhada mais de perto para confirmar se nossas suspeitas estão corretas. Usamos a ferramenta Linux dtrace para avaliar o que está acontecendo com o coletor de lixo V8.

O tempo medido do GC (coletor de lixo) indica os intervalos de um ciclo completo de uma única varredura feita pelo coletor de lixo:

bash
tempo aproximado (ms) | GC (ms) | GC modificado (ms)
=================================================
          0       |    0    |      0
          1       |    0    |      0
         40       |    0    |      2
        170       |    3    |      1
        300       |    3    |      1
         *             *           *
         *             *           *
         *             *           *
      39000       |    6    |     26
      42000       |    6    |     21
      47000       |    5    |     32
      50000       |    8    |     28
      54000       |    6    |     35

Enquanto os dois processos começam iguais e parecem trabalhar o GC na mesma taxa, fica evidente que após alguns segundos com um sistema de contrapressão funcionando corretamente, ele espalha a carga do GC em intervalos consistentes de 4-8 milissegundos até o final da transferência de dados.

No entanto, quando um sistema de contrapressão não está em vigor, a coleta de lixo do V8 começa a se arrastar. O binário normal chamado GC dispara aproximadamente 75 vezes em um minuto, enquanto o binário modificado dispara apenas 36 vezes.

Esta é a dívida lenta e gradual que se acumula com o aumento do uso da memória. À medida que os dados são transferidos, sem um sistema de contrapressão em vigor, mais memória está sendo usada para cada transferência de bloco.

Quanto mais memória está sendo alocada, mais o GC precisa cuidar em uma única varredura. Quanto maior a varredura, mais o GC precisa decidir o que pode ser liberado, e a varredura de ponteiros desligados em um espaço de memória maior consumirá mais poder de computação.

Esgotamento de Memória

Para determinar o consumo de memória de cada binário, cronometramos cada processo individualmente com /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js.

Esta é a saída no binário normal:

bash
Respecting the return value of .write()
=============================================
real        58.88
user        56.79
sys          8.79
  87810048  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
     19427  page reclaims
      3134  page faults
         0  swaps
         5  block input operations
       194  block output operations
         0  messages sent
         0  messages received
         1  signals received
        12  voluntary context switches
    666037  involuntary context switches

O tamanho máximo em bytes ocupado pela memória virtual resulta em aproximadamente 87,81 MB.

E agora, alterando o valor de retorno da função .write(), obtemos:

bash
Without respecting the return value of .write():
==================================================
real        54.48
user        53.15
sys          7.43
1524965376  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
    373617  page reclaims
      3139  page faults
         0  swaps
        18  block input operations
       199  block output operations
         0  messages sent
         0  messages received
         1  signals received
        25  voluntary context switches
    629566  involuntary context switches

O tamanho máximo em bytes ocupado pela memória virtual resulta em aproximadamente 1,52 GB.

Sem streams para delegar a contrapressão, há uma ordem de magnitude maior de espaço de memória sendo alocado - uma enorme margem de diferença entre o mesmo processo!

Este experimento mostra o quão otimizado e econômico é o mecanismo de contrapressão do Node.js para o seu sistema de computação. Agora, vamos fazer uma análise de como ele funciona!

Como a Contrapressão Resolve Esses Problemas?

Existem diferentes funções para transferir dados de um processo para outro. No Node.js, existe uma função interna embutida chamada .pipe(). Existem outros pacotes por aí que você também pode usar! No entanto, no nível básico desse processo, temos dois componentes separados: a fonte dos dados e o consumidor.

Quando .pipe() é chamado da fonte, ele sinaliza ao consumidor que há dados a serem transferidos. A função pipe ajuda a configurar os fechamentos de contrapressão apropriados para os gatilhos de eventos.

No Node.js, a fonte é um fluxo Readable e o consumidor é o fluxo Writable (ambos podem ser trocados por um fluxo Duplex ou Transform, mas isso está fora do escopo deste guia).

O momento em que a contrapressão é acionada pode ser reduzido exatamente ao valor de retorno da função .write() de um Writable. Esse valor de retorno é determinado por algumas condições, é claro.

Em qualquer cenário em que o buffer de dados excedeu o highwaterMark ou a fila de gravação está atualmente ocupada, .write() retornará false.

Quando um valor false é retornado, o sistema de contrapressão entra em ação. Ele pausará o fluxo Readable de entrada de enviar quaisquer dados e aguardará até que o consumidor esteja pronto novamente. Assim que o buffer de dados for esvaziado, um evento 'drain' será emitido e o fluxo de dados de entrada será retomado.

Assim que a fila terminar, a contrapressão permitirá que os dados sejam enviados novamente. O espaço na memória que estava sendo usado será liberado e preparado para o próximo lote de dados.

Isso permite efetivamente que uma quantidade fixa de memória seja usada a qualquer momento para uma função .pipe(). Não haverá vazamento de memória, nem buffer infinito, e o coletor de lixo só terá que lidar com uma área na memória!

Então, se a contrapressão é tão importante, por que você (provavelmente) não ouviu falar dela? Bem, a resposta é simples: o Node.js faz tudo isso automaticamente para você.

Isso é ótimo! Mas também não é tão bom quando estamos tentando entender como implementar nossos fluxos personalizados.

NOTA

Na maioria das máquinas, existe um tamanho em bytes que determina quando um buffer está cheio (que varia entre máquinas diferentes). O Node.js permite que você defina seu highWaterMark personalizado, mas comumente, o padrão é definido como 16 kb (16384 ou 16 para fluxos objectMode). Em casos em que você pode querer aumentar esse valor, faça isso, mas com cautela!

Ciclo de vida do .pipe()

Para alcançar uma melhor compreensão da contrapressão, aqui está um fluxograma do ciclo de vida de um fluxo Readable sendo encaixado em um fluxo Writable:

bash
                                                     +===================+
                         x-->  Funções de encaixamento   +-->   src.pipe(dest)  |
                         x     são configuradas durante     |===================|
                         x     o método .pipe.     |  Callbacks de eventos  |
  +===============+      x                           |-------------------|
  |   Seus Dados   |      x     Eles existem fora     | .on('close', cb)  |
  +=======+=======+      x     do fluxo de dados, mas    | .on('data', cb)   |
          |              x     importantemente anexam    | .on('drain', cb)  |
          |              x     eventos, e seus     | .on('unpipe', cb) |
+---------v---------+    x     respectivos callbacks. | .on('error', cb)  |
|  Fluxo Readable  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Fluxo Writable  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    Este chunk é muito grande?  |
  ^       |       |     emitir .end();             |    A fila está ocupada?      |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emitir .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  Não  |        |  Sim  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               emitir .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            quando a fila está vazia     +============+                         |
  ^------------^-----------------------<  Buffering |                         |
               |                       |============|                         |
               +> emitir .drain();       |  ^Buffer^  |                         |
               +> emitir .resume();      +------------+                         |
                                       |  ^Buffer^  |                         |
                                       +------------+   adicionar chunk à fila    |
                                       |            <---^---------------------<
                                       +============+

NOTA

Se você estiver configurando um pipeline para concatenar alguns fluxos para manipular seus dados, provavelmente estará implementando um fluxo Transform.

Neste caso, sua saída do seu fluxo Readable entrará no Transform e será encaixada no Writable.

javascript
Readable.pipe(Transformable).pipe(Writable)

A contrapressão será aplicada automaticamente, mas observe que tanto o highwaterMark de entrada quanto o de saída do fluxo Transform podem ser manipulados e afetarão o sistema de contrapressão.

Diretrizes de Contrapressão

Desde o Node.js v0.10, a classe Stream oferece a capacidade de modificar o comportamento de .read() ou .write() usando a versão com sublinhado dessas funções respectivas (._read() e ._write()).

Existem diretrizes documentadas para implementar streams Readable e implementar streams Writable. Assumiremos que você leu essas diretrizes, e a próxima seção entrará um pouco mais em detalhes.

Regras a serem seguidas ao implementar Streams personalizadas

A regra de ouro dos streams é sempre respeitar a contrapressão. O que constitui uma boa prática é uma prática não contraditória. Contanto que você tenha cuidado para evitar comportamentos que conflitam com o suporte interno de contrapressão, você pode ter certeza de que está seguindo boas práticas.

Em geral,

  1. Nunca .push() se você não for solicitado.
  2. Nunca chame .write() depois que ele retornar false, mas espere por 'drain' em vez disso.
  3. As mudanças de Streams entre diferentes versões do Node.js e a biblioteca que você usa. Seja cuidadoso e teste as coisas.

NOTA

Em relação ao ponto 3, um pacote incrivelmente útil para construir streams de navegador é readable-stream. Rodd Vagg escreveu um ótimo post no blog descrevendo a utilidade desta biblioteca. Em resumo, ele fornece um tipo de degradação graciosa automatizada para streams Readable e suporta versões mais antigas de navegadores e Node.js.

Regras específicas para Streams Readable

Até agora, analisamos como .write() afeta a contrapressão e nos concentramos muito no stream Writable. Devido à funcionalidade do Node.js, os dados fluem tecnicamente a jusante de Readable para Writable. No entanto, como podemos observar em qualquer transmissão de dados, matéria ou energia, a fonte é tão importante quanto o destino, e o stream Readable é vital para como a contrapressão é manipulada.

Ambos os processos dependem um do outro para se comunicar de forma eficaz, se o Readable ignorar quando o stream Writable pedir que ele pare de enviar dados, pode ser tão problemático quanto quando o valor de retorno de .write() estiver incorreto.

Portanto, além de respeitar o retorno de .write(), também devemos respeitar o valor de retorno de .push() usado no método ._read(). Se .push() retornar um valor falso, o stream parará de ler da fonte. Caso contrário, continuará sem pausa.

Aqui está um exemplo de má prática usando .push():

javascript
// Isso é problemático, pois ignora completamente o valor de retorno do push
// que pode ser um sinal de contrapressão do stream de destino!
class MyReadable extends Readable {
  _read(size) {
    let chunk
    while (null == (chunk = getNextChunk())) {
      this.push(chunk)
    }
  }
}

Além disso, de fora do stream personalizado, existem armadilhas para ignorar a contrapressão. Neste contra-exemplo de boa prática, o código do aplicativo força os dados a passar sempre que estiverem disponíveis (sinalizado pelo evento 'data'):

javascript
// Isso ignora os mecanismos de contrapressão que o Node.js implementou,
// e incondicionalmente envia dados, independentemente de o
// stream de destino estar pronto para recebê-los ou não.
readable.on('data', data => writable.write(data))

Aqui está um exemplo de uso de .push() com um stream Readable.

javascript
const { Readable } = require('node:stream')

// Crie um stream Readable personalizado
const myReadableStream = new Readable({
  objectMode: true,
  read(size) {
    // Envie alguns dados para o stream
    this.push({ message: 'Hello, world!' })
    this.push(null) // Marque o fim do stream
  },
})

// Consuma o stream
myReadableStream.on('data', chunk => {
  console.log(chunk)
})

// Saída:
// { message: 'Hello, world!' }

Regras específicas para fluxos graváveis (Writable Streams)

Lembre-se que um .write() pode retornar verdadeiro ou falso dependendo de algumas condições. Felizmente para nós, ao construir nosso próprio fluxo gravável, a máquina de estado do fluxo irá lidar com nossos callbacks e determinar quando lidar com backpressure e otimizar o fluxo de dados para nós. No entanto, quando queremos usar um Writable diretamente, devemos respeitar o valor de retorno do .write() e prestar atenção a estas condições:

  • Se a fila de escrita estiver ocupada, .write() retornará falso.
  • Se o chunk de dados for muito grande, .write() retornará falso (o limite é indicado pela variável highWaterMark).

Neste exemplo, criamos um fluxo Readable personalizado que insere um único objeto no fluxo usando .push(). O método ._read() é chamado quando o fluxo está pronto para consumir dados e, neste caso, imediatamente inserimos alguns dados no fluxo e marcamos o fim do fluxo inserindo null.

javascript
const stream = require('stream')

class MyReadable extends stream.Readable {
  constructor() {
    super()
  }

  _read() {
    const data = { message: 'Hello, world!' }
    this.push(data)
    this.push(null)
  }
}

const readableStream = new MyReadable()

readableStream.pipe(process.stdout)

Então consumimos o fluxo ouvindo o evento 'data' e registrando cada chunk de dados que é inserido no fluxo. Neste caso, só inserimos um único chunk de dados no fluxo, então só vemos uma mensagem de log.

Regras específicas para fluxos graváveis (Writable Streams)

Lembre-se que um .write() pode retornar verdadeiro ou falso dependendo de algumas condições. Felizmente para nós, ao construir nosso próprio fluxo gravável, a máquina de estado do fluxo irá lidar com nossos callbacks e determinar quando lidar com backpressure e otimizar o fluxo de dados para nós.

No entanto, quando queremos usar um Writable diretamente, devemos respeitar o valor de retorno do .write() e prestar atenção a estas condições:

  • Se a fila de escrita estiver ocupada, .write() retornará falso.
  • Se o chunk de dados for muito grande, .write() retornará falso (o limite é indicado pela variável highWaterMark).
javascript
class MyWritable extends Writable {
  // Este writable é inválido devido à natureza assíncrona dos callbacks do JavaScript.
  // Sem uma instrução de retorno para cada callback antes do último,
  // há uma grande chance de múltiplos callbacks serem chamados.
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) callback()
    else if (chunk.toString().indexOf('b') >= 0) callback()
    callback()
  }
}

Há também algumas coisas para observar ao implementar ._writev(). A função é acoplada com .cork(), mas há um erro comum ao escrever:

javascript
// Usar .uncork() duas vezes aqui faz duas chamadas na camada C++, tornando a
// técnica cork/uncork inútil.
ws.cork()
ws.write('hello ')
ws.write('world ')
ws.uncork()

ws.cork()
ws.write('from ')
ws.write('Matteo')
ws.uncork()

// A maneira correta de escrever isto é utilizar process.nextTick(), que dispara
// no próximo loop de eventos.
ws.cork()
ws.write('hello ')
ws.write('world ')
process.nextTick(doUncork, ws)

ws.cork()
ws.write('from ')
ws.write('Matteo')
process.nextTick(doUncork, ws)

// Como uma função global.
function doUncork(stream) {
  stream.uncork()
}

.cork() pode ser chamado quantas vezes quisermos, só precisamos ter cuidado para chamar .uncork() a mesma quantidade de vezes para fazê-lo fluir novamente.

Conclusão

Streams são um módulo frequentemente usado em Node.js. Eles são importantes para a estrutura interna e, para os desenvolvedores, para expandir e conectar-se através do ecossistema de módulos Node.js.

Esperançosamente, você agora poderá solucionar problemas e codificar com segurança seus próprios streams Writable e Readable com a contrapressão em mente, e compartilhar seu conhecimento com colegas e amigos.

Certifique-se de ler mais sobre Stream para outras funções da API para ajudar a melhorar e liberar suas capacidades de streaming ao construir um aplicativo com Node.js.