Contrapresión en Streams
Existe un problema general que ocurre durante el manejo de datos llamado contrapresión y describe una acumulación de datos detrás de un búfer durante la transferencia de datos. Cuando el extremo receptor de la transferencia tiene operaciones complejas, o es más lento por cualquier motivo, existe una tendencia a que los datos de la fuente entrante se acumulen, como un atasco.
Para resolver este problema, debe existir un sistema de delegación para asegurar un flujo suave de datos de una fuente a otra. Diferentes comunidades han resuelto este problema de manera única para sus programas; las tuberías de Unix y los sockets TCP son buenos ejemplos de esto y a menudo se les denomina control de flujo. En Node.js, los streams han sido la solución adoptada.
El propósito de esta guía es detallar aún más qué es la contrapresión y cómo exactamente los streams abordan esto en el código fuente de Node.js. La segunda parte de la guía presentará las mejores prácticas sugeridas para asegurar que el código de su aplicación sea seguro y esté optimizado al implementar streams.
Asumimos cierta familiaridad con la definición general de contrapresión
, Buffer
y EventEmitters
en Node.js, así como cierta experiencia con Stream
. Si no ha leído esos documentos, no es mala idea echar un vistazo a la documentación de la API primero, ya que ayudará a expandir su comprensión mientras lee esta guía.
El Problema con el Manejo de Datos
En un sistema informático, los datos se transfieren de un proceso a otro a través de pipes, sockets y señales. En Node.js, encontramos un mecanismo similar llamado Stream
. ¡Los Streams son geniales! Hacen mucho por Node.js y casi todas las partes del código base interno utilizan ese módulo. Como desarrollador, ¡te animamos a que los uses también!
const readline = require('node:readline')
const rl = readline.createInterface({
output: process.stdout,
input: process.stdin,
})
rl.question('¿Por qué deberías usar streams? ', answer => {
console.log(`¡Quizás sea ${answer}, quizás sea porque son increíbles!`)
})
rl.close()
Un buen ejemplo de por qué el mecanismo de contrapresión implementado a través de streams es una gran optimización se puede demostrar comparando las herramientas internas del sistema de la implementación de Stream de Node.js.
En un escenario, tomaremos un archivo grande (aproximadamente -9 GB) y lo comprimiremos utilizando la conocida herramienta zip(1)
.
zip The.Matrix.1080p.mkv
Si bien eso tardará unos minutos en completarse, en otra shell podemos ejecutar un script que toma el módulo zlib
de Node.js, que envuelve otra herramienta de compresión, gzip(1)
.
const gzip = require('node:zlib').createGzip()
const fs = require('node:fs')
const inp = fs.createReadStream('The.Matrix.1080p.mkv')
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz')
inp.pipe(gzip).pipe(out)
Para probar los resultados, intenta abrir cada archivo comprimido. El archivo comprimido por la herramienta zip(1)
te notificará que el archivo está dañado, mientras que la compresión finalizada por Stream se descomprimirá sin errores.
Nota
En este ejemplo, usamos .pipe()
para obtener la fuente de datos de un extremo al otro. Sin embargo, observe que no hay manejadores de errores apropiados adjuntos. Si un fragmento de datos no se recibe correctamente, la fuente Readable o el stream gzip
no se destruirán. pump
es una herramienta de utilidad que destruiría correctamente todos los streams en una canalización si uno de ellos falla o se cierra, ¡y es imprescindible en este caso!
pump
solo es necesario para Node.js 8.x o anterior, ya que para Node.js 10.x o versiones posteriores, se introduce pipeline
para reemplazar pump
. Este es un método de módulo para canalizar entre streams que reenvía errores y limpia correctamente y proporciona una devolución de llamada cuando la canalización está completa.
Aquí hay un ejemplo de cómo usar pipeline:
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// Utilice la API de pipeline para canalizar fácilmente una serie de streams
// juntos y recibir una notificación cuando la canalización esté completamente terminada.
// Una canalización para comprimir un archivo de video potencialmente enorme de manera eficiente:
pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
err => {
if (err) {
console.error('La canalización falló', err)
} else {
console.log('La canalización tuvo éxito')
}
}
)
También puedes usar el módulo stream/promises
para usar pipeline con async / await
:
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
try {
await pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz')
)
console.log('La canalización tuvo éxito')
} catch (err) {
console.error('La canalización falló', err)
}
}
Demasiados Datos, Demasiado Rápido
Hay casos en los que un flujo Readable
podría proporcionar datos al Writable
demasiado rápido, ¡mucho más de lo que el consumidor puede manejar!
Cuando eso ocurre, el consumidor comenzará a poner en cola todos los fragmentos de datos para su consumo posterior. La cola de escritura se hará cada vez más larga y, debido a esto, se debe mantener más datos en la memoria hasta que se complete todo el proceso.
Escribir en un disco es mucho más lento que leer de un disco, por lo tanto, cuando intentamos comprimir un archivo y escribirlo en nuestro disco duro, se producirá contrapresión porque el disco de escritura no podrá seguir el ritmo de la velocidad de la lectura.
// En secreto, el flujo está diciendo: "¡Oye, oye! espera, ¡esto es demasiado!"
// Los datos comenzarán a acumularse en el lado de lectura del búfer de datos a medida que
// la escritura intenta seguir el ritmo del flujo de datos entrante.
inp.pipe(gzip).pipe(outputFile)
Esta es la razón por la que un mecanismo de contrapresión es importante. Si no existiera un sistema de contrapresión, el proceso agotaría la memoria de tu sistema, ralentizando efectivamente otros procesos y monopolizando una gran parte de tu sistema hasta que se complete.
Esto resulta en algunas cosas:
- Ralentización de todos los demás procesos actuales.
- Un recolector de basura muy sobrecargado.
- Agotamiento de la memoria.
En los siguientes ejemplos, tomaremos el valor de retorno de la función .write()
y lo cambiaremos a true
, lo que desactiva de manera efectiva el soporte de contrapresión en el núcleo de Node.js. En cualquier referencia a un binario 'modificado'
, estamos hablando de ejecutar el binario de nodo sin la línea return ret;
, y en su lugar con el return true;
reemplazado.
Exceso de Carga en la Recolección de Basura
Echemos un vistazo a una rápida prueba comparativa. Usando el mismo ejemplo de arriba, ejecutamos algunas pruebas de tiempo para obtener un tiempo medio para ambos binarios.
prueba (#) | binario `node` (ms) | binario `node` modificado (ms)
=================================================================
1 | 56924 | 55011
2 | 52686 | 55869
3 | 59479 | 54043
4 | 54473 | 55229
5 | 52933 | 59723
=================================================================
tiempo promedio:| 55299 | 55975
Ambos tardan alrededor de un minuto en ejecutarse, por lo que no hay mucha diferencia en absoluto, pero echemos un vistazo más de cerca para confirmar si nuestras sospechas son correctas. Utilizamos la herramienta de Linux dtrace
para evaluar lo que está sucediendo con el recolector de basura V8.
El tiempo medido de GC (recolector de basura) indica los intervalos de un ciclo completo de un solo barrido realizado por el recolector de basura:
tiempo aprox. (ms) | GC (ms) | GC modificado (ms)
=================================================
0 | 0 | 0
1 | 0 | 0
40 | 0 | 2
170 | 3 | 1
300 | 3 | 1
* * *
* * *
* * *
39000 | 6 | 26
42000 | 6 | 21
47000 | 5 | 32
50000 | 8 | 28
54000 | 6 | 35
Si bien los dos procesos comienzan igual y parecen trabajar con el GC al mismo ritmo, se hace evidente que después de unos segundos con un sistema de contrapresión funcionando correctamente, extiende la carga del GC a través de intervalos consistentes de 4-8 milisegundos hasta el final de la transferencia de datos.
Sin embargo, cuando no hay un sistema de contrapresión en su lugar, la recolección de basura de V8 comienza a prolongarse. El binario normal que llama al GC se dispara aproximadamente 75 veces en un minuto, mientras que el binario modificado se dispara solo 36 veces.
Esta es la deuda lenta y gradual que se acumula debido al creciente uso de memoria. A medida que se transfieren los datos, sin un sistema de contrapresión en su lugar, se utiliza más memoria para cada transferencia de fragmentos.
Cuanta más memoria se asigne, más tendrá que ocuparse el GC en un solo barrido. Cuanto más grande sea el barrido, más tendrá que decidir el GC qué se puede liberar, y el escaneo de punteros desprendidos en un espacio de memoria más grande consumirá más potencia de cálculo.
Agotamiento de la Memoria
Para determinar el consumo de memoria de cada binario, hemos cronometrado cada proceso individualmente con /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js
.
Esta es la salida en el binario normal:
Respetando el valor de retorno de .write()
=============================================
real 58.88
user 56.79
sys 8.79
87810048 tamaño máximo del conjunto residente
0 tamaño promedio de memoria compartida
0 tamaño promedio de datos no compartidos
0 tamaño promedio de pila no compartida
19427 recuperaciones de páginas
3134 fallos de página
0 swaps
5 operaciones de entrada de bloque
194 operaciones de salida de bloque
0 mensajes enviados
0 mensajes recibidos
1 señales recibidas
12 cambios de contexto voluntarios
666037 cambios de contexto involuntarios
El tamaño máximo en bytes ocupado por la memoria virtual resulta ser aproximadamente 87.81 mb.
Y ahora, cambiando el valor de retorno de la función .write()
, obtenemos:
Sin respetar el valor de retorno de .write():
==================================================
real 54.48
user 53.15
sys 7.43
1524965376 tamaño máximo del conjunto residente
0 tamaño promedio de memoria compartida
0 tamaño promedio de datos no compartidos
0 tamaño promedio de pila no compartida
373617 recuperaciones de páginas
3139 fallos de página
0 swaps
18 operaciones de entrada de bloque
199 operaciones de salida de bloque
0 mensajes enviados
0 mensajes recibidos
1 señales recibidas
25 cambios de contexto voluntarios
629566 cambios de contexto involuntarios
El tamaño máximo en bytes ocupado por la memoria virtual resulta ser aproximadamente 1.52 gb.
Sin flujos en su lugar para delegar la contrapresión, hay un orden de magnitud mayor de espacio de memoria que se está asignando: ¡un margen enorme de diferencia entre el mismo proceso!
Este experimento muestra lo optimizado y rentable que es el mecanismo de contrapresión de Node.js para su sistema informático. ¡Ahora, vamos a analizar cómo funciona!
¿Cómo resuelve la contrapresión estos problemas?
Existen diferentes funciones para transferir datos de un proceso a otro. En Node.js, existe una función interna incorporada llamada .pipe()
. ¡También hay otros paquetes que puedes usar! Sin embargo, en última instancia, en el nivel básico de este proceso, tenemos dos componentes separados: la fuente de los datos y el consumidor.
Cuando se llama a .pipe()
desde la fuente, se le indica al consumidor que hay datos para transferir. La función pipe ayuda a configurar los cierres de contrapresión apropiados para los activadores de eventos.
En Node.js, la fuente es un flujo Readable
y el consumidor es el flujo Writable
(ambos pueden intercambiarse con un flujo Duplex o un flujo Transform, pero eso está fuera del alcance de esta guía).
El momento en que se activa la contrapresión se puede reducir exactamente al valor de retorno de la función .write()
de un Writable
. Este valor de retorno está determinado por algunas condiciones, por supuesto.
En cualquier escenario en el que el búfer de datos haya superado el highwaterMark
o la cola de escritura esté actualmente ocupada, .write()
devolverá false
.
Cuando se devuelve un valor false
, se activa el sistema de contrapresión. Pausará el flujo Readable
entrante para que no envíe ningún dato y esperará hasta que el consumidor esté listo nuevamente. Una vez que se vacía el búfer de datos, se emitirá un evento 'drain'
y se reanudará el flujo de datos entrante.
Una vez que la cola ha terminado, la contrapresión permitirá que los datos se envíen nuevamente. El espacio en la memoria que se estaba utilizando se liberará y se preparará para el próximo lote de datos.
Esto permite efectivamente que se use una cantidad fija de memoria en cualquier momento para una función .pipe()
. ¡No habrá fugas de memoria ni almacenamiento en búfer infinito, y el recolector de basura solo tendrá que lidiar con un área en la memoria!
Entonces, si la contrapresión es tan importante, ¿por qué (probablemente) no has oído hablar de ella? Bueno, la respuesta es simple: Node.js hace todo esto automáticamente por ti.
¡Eso es genial! Pero tampoco es tan genial cuando estamos tratando de entender cómo implementar nuestros propios flujos personalizados.
NOTA
En la mayoría de las máquinas, existe un tamaño de bytes que determina cuándo un búfer está lleno (lo que variará entre diferentes máquinas). Node.js te permite establecer tu highWaterMark
personalizado, pero comúnmente, el valor predeterminado se establece en 16 kb (16384 o 16 para flujos en objectMode). En los casos en los que desees aumentar ese valor, hazlo, ¡pero con precaución!
Ciclo de vida de .pipe()
Para lograr una mejor comprensión de la contrapresión, aquí hay un diagrama de flujo sobre el ciclo de vida de un flujo Readable
que se conecta a un flujo Writable
:
+===================+
x--> Funciones de canalización +--> src.pipe(dest) |
x se configuran durante |===================|
x el método .pipe. | Retrollamadas de eventos |
+===============+ x |-------------------|
| Tus Datos | x Existen fuera del | .on('close', cb) |
+=======+=======+ x flujo de datos, pero | .on('data', cb) |
| x es importante adjuntar | .on('drain', cb) |
| x eventos, y sus | .on('unpipe', cb) |
+---------v---------+ x retrollamadas respectivas.| .on('error', cb) |
| Flujo Readable +----+ | .on('finish', cb) |
+-^-------^-------^-+ | | .on('end', cb) |
^ | ^ | +-------------------+
| | | |
| ^ | |
^ ^ ^ | +-------------------+ +=================+
^ | ^ +----> Flujo Writable +---------> .write(chunk) |
| | | +-------------------+ +=======+=========+
| | | |
| ^ | +------------------v---------+
^ | +-> if (!chunk) | ¿Es este fragmento demasiado grande? |
^ | | emit .end(); | ¿Está ocupada la cola? |
| | +-> else +-------+----------------+---+
| ^ | emit .write(); | |
| ^ ^ +--v---+ +---v---+
| | ^-----------------------------------< No | | Sí |
^ | +------+ +---v---+
^ | |
| ^ emit .pause(); +=================+ |
| ^---------------^-----------------------+ return false; <-----+---+
| +=================+ |
| |
^ cuando la cola está vacía +============+ |
^------------^-----------------------< Almacenamiento en búfer | |
| |============| |
+> emit .drain(); | ^Búfer^ | |
+> emit .resume(); +------------+ |
| ^Búfer^ | |
+------------+ añadir fragmento a la cola |
| <---^---------------------<
+============+
NOTA
Si está configurando una canalización para encadenar algunos flujos para manipular sus datos, lo más probable es que esté implementando el flujo Transform.
En este caso, la salida de su flujo Readable
entrará en el Transform
y se canalizará al Writable
.
Readable.pipe(Transformable).pipe(Writable)
La contrapresión se aplicará automáticamente, pero tenga en cuenta que tanto el highwaterMark
entrante como el saliente del flujo Transform
se pueden manipular y afectarán al sistema de contrapresión.
Directrices de contrapresión
Desde Node.js v0.10, la clase Stream ha ofrecido la capacidad de modificar el comportamiento de .read()
o .write()
utilizando la versión con guion bajo de estas funciones respectivas (._read()
y ._write()
).
Existen directrices documentadas para implementar streams legibles e implementar streams escribibles. Asumiremos que ya las has leído, y la siguiente sección profundizará un poco más.
Reglas a seguir al implementar streams personalizados
La regla de oro de los streams es respetar siempre la contrapresión. Lo que se considera una buena práctica es una práctica no contradictoria. Siempre y cuando tengas cuidado de evitar comportamientos que entren en conflicto con el soporte interno de contrapresión, puedes estar seguro de que estás siguiendo una buena práctica.
En general,
- Nunca uses
.push()
si no te lo piden. - Nunca llames a
.write()
después de que devuelva false, sino que espera a 'drain' en su lugar. - Los streams cambian entre diferentes versiones de Node.js y la biblioteca que utilizas. Ten cuidado y prueba las cosas.
NOTA
Con respecto al punto 3, un paquete increíblemente útil para construir streams de navegador es readable-stream
. Rodd Vagg ha escrito una gran publicación de blog que describe la utilidad de esta biblioteca. En resumen, proporciona un tipo de degradación gradual automatizada para streams legibles y es compatible con versiones anteriores de navegadores y Node.js.
Reglas específicas para Readable Streams
Hasta ahora, hemos examinado cómo .write()
afecta la contrapresión y nos hemos centrado mucho en el stream Writable. Debido a la funcionalidad de Node.js, los datos técnicamente fluyen aguas abajo desde Readable a Writable. Sin embargo, como podemos observar en cualquier transmisión de datos, materia o energía, la fuente es tan importante como el destino, y el stream Readable es vital para cómo se maneja la contrapresión.
Ambos procesos dependen el uno del otro para comunicarse eficazmente, si Readable ignora cuando el stream Writable le pide que deje de enviar datos, puede ser tan problemático como cuando el valor de retorno de .write()
es incorrecto.
Por lo tanto, además de respetar el retorno de .write()
, también debemos respetar el valor de retorno de .push()
utilizado en el método ._read()
. Si .push()
devuelve un valor falso, el stream dejará de leer de la fuente. De lo contrario, continuará sin pausa.
Aquí hay un ejemplo de mala práctica utilizando .push()
:
// Esto es problemático ya que ignora completamente el valor de retorno de push
// ¡que puede ser una señal de contrapresión del stream de destino!
class MyReadable extends Readable {
_read(size) {
let chunk
while (null == (chunk = getNextChunk())) {
this.push(chunk)
}
}
}
Además, desde fuera del stream personalizado, existen riesgos al ignorar la contrapresión. En este contraejemplo de buena práctica, el código de la aplicación fuerza los datos cada vez que están disponibles (señalizado por el evento 'data'
):
// Esto ignora los mecanismos de contrapresión que Node.js ha establecido,
// y empuja incondicionalmente los datos, independientemente de si el
// stream de destino está listo para ellos o no.
readable.on('data', data => writable.write(data))
Aquí hay un ejemplo de cómo usar .push()
con un stream Readable.
const { Readable } = require('node:stream')
// Crea un stream Readable personalizado
const myReadableStream = new Readable({
objectMode: true,
read(size) {
// Empuja algunos datos al stream
this.push({ message: '¡Hola, mundo!' })
this.push(null) // Marca el final del stream
},
})
// Consume el stream
myReadableStream.on('data', chunk => {
console.log(chunk)
})
// Output:
// { message: '¡Hola, mundo!' }
Reglas específicas para Streams de escritura
Recordemos que un .write()
puede devolver verdadero o falso dependiendo de algunas condiciones. Afortunadamente para nosotros, al construir nuestro propio stream de escritura, la máquina de estados del stream manejará nuestras devoluciones de llamada y determinará cuándo manejar la contrapresión y optimizar el flujo de datos para nosotros. Sin embargo, cuando queremos usar un stream de escritura directamente, debemos respetar el valor de retorno de .write()
y prestar mucha atención a estas condiciones:
- Si la cola de escritura está ocupada,
.write()
devolverá falso. - Si el fragmento de datos es demasiado grande,
.write()
devolverá falso (el límite está indicado por la variable, highWaterMark).
En este ejemplo, creamos un stream de lectura personalizado que inserta un único objeto en el stream usando .push()
. El método ._read()
se llama cuando el stream está listo para consumir datos, y en este caso, insertamos inmediatamente algunos datos en el stream y marcamos el final del stream insertando null
.
const stream = require('stream')
class MyReadable extends stream.Readable {
constructor() {
super()
}
_read() {
const data = { message: '¡Hola, mundo!' }
this.push(data)
this.push(null)
}
}
const readableStream = new MyReadable()
readableStream.pipe(process.stdout)
Luego consumimos el stream escuchando el evento 'data' y registrando cada fragmento de datos que se inserta en el stream. En este caso, solo insertamos un único fragmento de datos en el stream, por lo que solo vemos un mensaje de registro.
Reglas específicas para Writable Streams
Recuerda que un .write()
puede devolver verdadero o falso dependiendo de algunas condiciones. Afortunadamente para nosotros, al construir nuestro propio stream Writable, la máquina de estados del stream manejará nuestros callbacks y determinará cuándo manejar la contrapresión y optimizar el flujo de datos para nosotros.
Sin embargo, cuando queremos usar un Writable directamente, debemos respetar el valor de retorno de .write()
y prestar mucha atención a estas condiciones:
- Si la cola de escritura está ocupada,
.write()
devolverá falso. - Si el fragmento de datos es demasiado grande,
.write()
devolverá falso (el límite lo indica la variable, highWaterMark).
class MyWritable extends Writable {
// Este Writable no es válido debido a la naturaleza asíncrona de los callbacks de JavaScript.
// Sin una declaración de retorno para cada callback anterior al último,
// hay una gran probabilidad de que se llamen varios callbacks.
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) callback()
else if (chunk.toString().indexOf('b') >= 0) callback()
callback()
}
}
También hay algunas cosas a tener en cuenta al implementar ._writev()
. La función está acoplada con .cork()
, pero hay un error común al escribir:
// Usar .uncork() dos veces aquí realiza dos llamadas en la capa C++, haciendo que la
// técnica de cork/uncork sea inútil.
ws.cork()
ws.write('hola ')
ws.write('mundo ')
ws.uncork()
ws.cork()
ws.write('de ')
ws.write('Matteo')
ws.uncork()
// La forma correcta de escribir esto es utilizar process.nextTick(), que se activa
// en el siguiente bucle de eventos.
ws.cork()
ws.write('hola ')
ws.write('mundo ')
process.nextTick(doUncork, ws)
ws.cork()
ws.write('de ')
ws.write('Matteo')
process.nextTick(doUncork, ws)
// Como función global.
function doUncork(stream) {
stream.uncork()
}
Se puede llamar a .cork()
tantas veces como queramos, solo debemos tener cuidado de llamar a .uncork()
la misma cantidad de veces para que vuelva a fluir.
Conclusión
Los streams son un módulo de uso frecuente en Node.js. Son importantes para la estructura interna y, para los desarrolladores, para expandir y conectar a través del ecosistema de módulos de Node.js.
Con suerte, ahora podrás solucionar problemas y codificar de forma segura tus propios streams Writable
y Readable
teniendo en cuenta la contrapresión, y compartir tus conocimientos con colegas y amigos.
Asegúrate de leer más sobre Stream
para conocer otras funciones API que te ayudarán a mejorar y liberar tus capacidades de streaming cuando construyas una aplicación con Node.js.