El manejo de streams en NodeJS

16927235050_fb1bb19262_h.jpg

Imagen de Jack Landau

Parece mentira, pero muchas veces olvidamos que la mayoría de APIs con las que cuenta NodeJS en su ‘core’ están basadas en sistemas de streams. Las APIs nos acaban abstrayendo tanto del bajo nivel que olvidamos conceptos vistos en la programación tradicional de toda la vida.

Estas abstracciones nos ayuda a ser más productivos, pero muchas veces nos hacen olvidar la esencia de las cosas, el aprendizaje de los conceptos más básicos y hace que, algunos problemas que se podrían solucionarse de una forma más o menos sencilla, se nos compliquen por culpa de no tener estos conocimientos.

Me he dado cuenta que conocer los streams en NodeJS puede suponernos un cambio a la hora en la que programamos y en cómo damos solución a los problemas, por lo que creo que es bueno que nos detengamos un tiempo en aprender sobre ellos.

A lo largo del artículo explicaremos que es esto de los streams, veremos los diferentes tipos con los que contamos en NodeJS, mostraremos algunos casos en donde ya estamos haciendo uso de ellos, crearemos un nuevo tipo de stream y explicaremos que tienen que ver los streams con la Programación Funcional Reactiva.

Parece bastante ambicioso el plan ¿lo lograremos? ¿Nos adentramos un rato en el mundo de los streams en NodeJS? Adelante:

¿Qué es un stream?

La propia traducción al castellano es bastante significativa de la naturaleza de un stream. Stream en castellano significa: corriente, arroyo, flujo, torrente o chorro. Por tanto, un stream, en el mundo de la programación, no deja de ser una forma de obtener o depositar datos binarios de una fuente a otra en un flujo continuado.

Un stream es una transmisión por medio de secuencias. Nos permite leer, difundir o descargar estas secuencias por medio de un flujo continuo donde en canal de comunicación es fluido en el tiempo mientras nosotros lo deseemos o la información que se quiere transmitir ha terminado.

Estamos muy acostumbrados a disfrutar de sistemas de streaming como pueda ser la retransmisión de musca bajo demanda o la retransmisión de eventos en directo por medio de plataformas como Youtube por lo que nos es algo cotidiano.

Me gusta pensar en un stream como una cinta transportadora que comunica dos puntos. En esta cinta se ponen pequeñas piezas (conocidos como ‘chunks’) que por si solos no tienen sentido, pero que juntas aportan un valor, una información, un producto.

fuentes

Los streams suelen ser usados en sistemas donde obtener toda la información de un proceso no es posible de una forma atómica y se necesita desmenuzar el mensaje para ser retransmitido. La información es tan grande que el canal no lo soporta y tiene que enviarse de esta manera.

Puede darse el caso también, de que necesitemos recibir un volumen de eventos o peticiones muy elevado, que tienen un sentido en común pero que no puede recibirse todas a la vez, ya sea porque el canal es demasiado pequeño o porque los eventos y peticiones no se producen al unísonos. Los streams son un buen medio para recibir las peticiones.

El concepto no es nuevo y ya llevamos disfrutándolos en sistemas como Unix o Java para poder leer y escribir en ficheros u otros medios físicos de una forma eficiente. Esto no quita para que a muchos desarrolladores esto nos haya sonado a chino a día de hoy.

Por tanto, veamos los tipos de streams y para qué son usado.

Tipos de streams en NodeJS

Para poder hacer uso de la potencia de los streams en NodeJS  solo tenemos que importar el módulo que viene con el núcleo de esta manera:

const stream = require('stream');

A partir de aquí, el módulo cuenta con 4 tipos de streams que podemos consumir:

  • Stream Readable
  • Stream Writable
  • Stream Duplex
  • Stream Transform

Explicaremos cada uno de ellos detenidamente, pero antes, hagamos un inciso para explicar con que tipo de datos operan los streams y cómo se comporta el buffer interno, un comportamiento y una operabilidad que son comunes a los 4 tipos de streams:

Object Mode y Buffering

Los streams creados en NodeJS pueden almacenar en su buffer exclusivamente datos de tipo string. Sin embargo, y como veremos en próximos apartados, existe la posibilidad de implementar streams que funciones con otros tipos de datos o de valores de JavaScript es lo que se conoce como modo objeto (object mode).

Además, cuando queremos hacer uso de un stream, podemos indicar el tamaño del buffer. Es lo que se conoce como ‘highWaterMark’. De esta forma podemos evitar que nuestra memoria se sobrecargue, poniendo un límite. Lo que hace el módulo es leer o escribir del buffer hasta que se llega al límite configurado. En ese caso, se para el proceso de flujo y se espera a que se consuma o distribuya el chorro de bytes que se inserta.

Una vez que tenemos claro esto, empecemos con los diferentes tipos:

Stream Readable

Es una abstracción para consumir datos de un origen determinado. Yo puedo hacer uso de este stream de la siguiente manera:

const Readable = require('stream').Readable;
const rs = new Readable();

rs.push('beep ');
rs.push('boop\n');
rs.push(null);

rs.pipe(process.stdout);

El stream de lectura siempre se da por terminado cuando se inserta un ‘null’ .

Esta abstracción cuenta con los siguientes eventos a los que me puedo registrar – todos los streams de node al final heredan de ‘EventEmitter‘:

  • close: el evento es emitido cuando el stream ha sido cerrado.
  • end: el evento es emitido cuando no hay más datos a consumir.
  • error: el evento se genera cuando existe un error durante la transmisión del stream.
  • readable: el evento se emite cuando hay datos disponibles para ser leídos por el stream.

Hay también que tener en cuenta que contamos con 3 métodos importantes: ‘isPaused’ y ‘pause’ y ‘resume’ que nos permite parar, reanudar un stream o saber en que estado se encuentra el stream. Podríamos hacer esto:

const readable = new stream.Readable();

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false

Stream Writable

Nos permite escribir un flujo de manera dinámica en un destino especificado.

const Writable = require('stream').Writable;
const myStream = new Writable();

myStream.write('some data'); 
myStream.write('some more data'); 
myStream.end('done writing data');

Esta abstracción cuenta con los siguientes eventos a los que me puedo registrar:

  • close: el evento es emitido cuando el stream ha sido cerrado.
  • finish: el evento es emitido después de ‘stream.end()’ y todos los datos se han eliminado del buffer.
  • error: el evento se genera cuando existe un error durante la transmisión del stream.

Stream Duplex

Los streams duplex son aquellos que se comportan bien tanto para lectura como para escritura. En NodeJS existen varios de estos streams. Para entenderlos, pensemos en un teléfono que permite flujo de información tanto en un sentido como en el otro. Un Stream Duplex es capaz de hacer esta operación:

a.pipe(b).pipe(a);

Como su comportamiento es el conjunto de los dos anteriores, no creo que sea necesario un ejemplo. Simplemente pensar en el comportamiento de ‘request’ o ‘response’ del módulo ‘http’.

streams-in-node-js-18-638.jpg

Stream Transform

Un stream transform es un caso específico de stream duplex. Es también un stream de lectura y escritura, con la diferencia que nos permite realizar transformaciones sobre la lectura o sobre la escritura.

Implementando un nuevo tipo de stream

Es raro que un desarrollador tenga que consumir el módulo stream de las maneras que hemos explicado anteriormente. Nos viene bien para conocer sus mecanismos, pero lo más normal es que cuando necesitemos hacer uso de un nuevo tipo de stream que no se encuentra en el propio NodeJS, implementemos nuestra propia solución a partir de ‘stream’. Esto es posible a la extensibilidad que presenta el módulo, como veremos a continuación.

Pongamos un ejemplo: vamos a desarrollar un contador que nos permita ir mostrando por la salida estándar una cuenta atrás.

Para conseguir esto, vamos a extender el stream duplex con el que cuenta NodeJS. Extender este stream puede hacerse de 3 formas:

O hacerlo en un formato ES6:

const Duplex = require('stream').Duplex;

class MyDuplex extends Duplex {
    constructor(options) {
        super(options);
    }
}

O hacerlo en un formato ES5:

const Duplex = require('stream').Duplex;
const util = require('util');

function MyDuplex(options) {
    if (!(this instanceof MyDuplex))
        return new MyDuplex(options);
    Duplex.call(this, options);
}

util.inherits(MyDuplex, Duplex);

O usando el constructor de la propia clase duplex:

const Duplex = require('stream').Duplex;

const myDuplex = new Duplex({
    read(size) {
        // ...
    },
    write(chunk, encoding, callback) {
       // ...
    }
});

Nosotros usaremos la aproximación de ES6. Nuestro stream quedará de esta forma:

const Duplex = require('stream').Duplex;

class DuplexCount extends Duplex {
    constructor(count, options) {
        super(options);
        this.count = count;
    }

    _read(size) {
        this.push(this.count.toString());

        if (this.count-- === 0) {
            this.push(null);
        }

    }

    _write(chunk, encoding, callback) {
        console.log(`write: ${chunk.toString()}`);
        callback();
    }
}

Lo que tenemos que hacer es extender los métodos ‘_read’ y ‘_write’. De esta forma, conseguimos tener un stream duplex de doble dirección ya que sabe escribir y leer. La demostración más directa es que podemos concatenar el stream para ir generando una salida y mostrarla por pantalla.

Para usarlo, solo tenemos que hacer esto:

const duplex = new DuplexCount(1000);
duplex.pipe(duplex);

Usos prácticos de los streams

Hay 3 casos muy claros donde estás haciendo uso de streams y quizá no te hayas dado cuenta:

  • Cuando lees y escribes de la salida estándar del sistema
  • Cuando lees y escribes en ficheros de disco
  • Cuando recibes y respondes en llamadas http

Veamos ejemplos de cada uno de ellos:

Leer y escribir en la salida estándar

Escribir y leer de la salida estándar tiene que ser un proceso lo  menos bloqueante posible. Tanto ‘process.stdout’ y ‘process.stdin’ hacen el uso de stream para recoger o pintar datos de manera fluida.

Leer y escribir en ficheros

Estamos muy acostumbrado en NodeJS a leer y escribir en ficheros de manera asíncrona. La forma en lo que lo hemos hecho siempre es la siguiente:

fs.readFile(__dirname + '/data.txt', function (err, data) {
    console.log(data);
});

El algoritmo anterior funciona, pero en sistemas de alta carga, este tipo de lectura puede ser un problema. ‘readFile’ es un método que no termina hasta que no ha volcado todo el contenido del fichero en memoria. Como el fichero sea grande, puede limitar mucho las posibilidades de nuestro sistema ya que puede existir un proceso bloqueante que consume mucho.

El uso de streams puede ayudarnos a crear un flujo donde se vaya leyendo y mostrando de una manera más más continua. En NodeJS ya existe un método que nos permite incluir datos en un stream de esta forma:

const stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(process.stdout);

De esta manera estamos escribiendo en la salida estándar a la vez que leemos del fichero. No existe un proceso bloqueante y la funcionalidad gana en prestaciones y continuidad. Con la escritura de ficheros nos ocurre lo mismo

Recibir y responder en llamada http

Otra entrada / salida típica de nuestros sistemas, suele ser las comunicaciones por el protocolo http. El envío y recepción de cuerpos de la llamada puede ser bastante grande y su tratamiento de una manera fluida pueda ayudarnos a crear sistemas más rápidos. En el momento que voy recibiendo datos de un cliente, los voy procesando.

El método ‘createServer’ del módulo ‘http’ tiene una función de ‘callback’ que nos devuelve dos streams tipo ‘Duplex’ llamados request (contiene los datos de la petición del cliente) y response (contiene los datos de la respuesta del servidor). Si volvemos al ejemplo anterior, podemos crear un flujo continuo de envío de un fichero a un cliente que ha hecho una petición a nuestro sistema:

const http = require('http');
const fs = require('fs');

const server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server.listen(8000);

De esta manera, a la vez que estoy leyendo el fichero, lo voy enviando, consiguiendo un flujo que permite no sobrecargar la memoria.

Los streams y la Programación Funcional Reactiva

Los streams encajan muy bien en este paradigma de programación por dos razones:

Los streams pueden ser funcionales

Los streams en NodeJS presentan un método muy útil en programación funcional, se trata del método ‘pipe’. Este método nos permite encadenar diferentes streams para su manipulación por medio de cómputos. Lo que hace es recibir un streams de entrada, realiza una operación sobre dicho stream y devuelve un nuevo stream con dicha transformación.

Si tuviéramos dos streams con nombre a y b, podríamos componerlos de la siguiente manera:

a.pipe(b)

Las ideas de composición, encadenamiento de cómputos, modularización y  flujo de datos en una tubería unidireccional son conceptos tan arraigados en la programación funcional que los streams parecen creados para este paradigma.

Un ejemplo muy claro de esto es el siguiente algoritmo:

fs.createReadStream('file.txt')
    .pipe(zlib.createGzip())
    .pipe(fs.createWriteStream('file.txt.gz'));

Lo que hace este algoritmo es leer un fichero, comprimir su contenido y guardarlo en un nuevo fichero con la nueva extensión de la compresión. Las funciones se encadenan dejando un código muy declarativo y permitiéndonos encadenar cualquier otro cómputo que sepa trabajar con streams.

Si lo veis, y habéis usado ‘Gulp’, os habrá venido a la cabeza su manera de trabajar. ‘Gulp’ es una de las librerías que más trabaja con streams y encadenamiento de cómputos por medio de ‘pipe’.

Los streams son reactivos

La programación reactiva es el paradigma enfocado en el trabajo de flujos de datos finitos o infinitos de manera asíncrona. La programación reactiva tiene como base cuatro puntos necesarios:

  • Tiene que ser responsiva, es decir, el sistema tiene que asegurar la calidad del servicio cumpliendo unos tiempos de respuesta establecidos. Parece que los streams cumplen perfectamente el cometido, pues su uso se basa en la optimización de recursos por medio de flujos.
  • Tiene que ser resiliente, o lo que es lo mismo, ser responsiva incluso ante situaciones en las que se produce un error. Hemos visto como los streams de node son capaces de controlar errores y la programación funcional es un buen paradigma para esta gestión.
  • Tiene que ser elástico, el aumento de la carga de trabajo, no hace decaer el sistema.
  • Orientado a mensajes, minimizan el acoplamiento entre componentes al establecer interacciones basadas en el intercambio de mensajes de manera asíncrona. trabajamos en NodeJS, el sistema asíncrono por antonomasia, los streams no iban a ser menos. Además el uso de streams combinado con librerías como RxJS nos da una potencia muy grande gracias a la posibilidad de poder incluir observadores en el flujo de transmisión y/o transformación.

Parece entonces, que los stream, son la base de la programación reactiva.

Conclusión

Los streams ayudan en nuestra labor de modulizarización y reutilización de componentes. Al poder trabajar con flujos desacoplados, podemos concatenar diferentes tipos de streams, transformándolos y compartiéndolos con diferentes soportes.

En sistemas donde la Entrada/Salida se comporta de una manera asíncrona, como en NodeJS, no parece ninguna locura la posibilidad de hacer un uso masivo de streams. Los streams ayudan a consumir menos recursos y permitir comunicación entre canales y terminales con diferente velocidad y por tanto son una necesidad para todo desarrollador que necesita trabajar en entornos de alto rendimiento y disponibilidad.

Que NodeJS ya abstraiga todos los tipos de streams necesarios para la mayoría de operaciones, no es una excusa para que no sepamos su comportamiento interno y es necesario que siempre que podamos profundicemos en el funcionamiento de las herramientas que utilizamos.

Nos leemos 🙂

Anuncios

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión / Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión / Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión / Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión / Cambiar )

Conectando a %s