Si has programado en Node.js es muy probable que hayas usado streams sin siquiera saberlo.
De hecho, son tan importantes que forman parte del core de Node. Cada request o response de tu
servidor, cada console.log
u operación sobre el filesystem involucra algún tipo de stream. 💥
Un stream es una interfaz que representa una secuencia de datos a---b---c
en el tiempo y en donde
la información fluye desde una fuente hacia un destino.
Los streams nos permiten por ejemplo leer un archivo por partes (chunks) a través de un ReadableStream
,
aplicarle algún tipo de transformación por medio de un TransformStream
y escribir cada chunk modificado
en un destino particular con un WritableStream
.
Los streams pueden operar en un solo sentido como un ReadableStream que solo lee de una fuente y envía sus datos al siguiente stream:
ReadableStream ---> ( DuplexStream | TransformStream ) ---> WriteableStream
Pero también existen los DuplexStream
que permiten operaciones tanto de lectura como escritura.
Un ejemplo seria un Socket
!> Lo importante es tener en cuenta que dichas interfaces existen para definir una única forma de operar sobre datos de forma eficiente y escalable. No importa si los datos se leen o escriben desde el disco o de una conexión de red, los streams hablan un solo lenguaje y eso nos permite combinarlos como necesitemos.
🔗 No es parte del workshop avanzar demasiado en este tema pero si querés aprender mas te recomendamos el stream-handbook.
Internamente hypercore
utiliza streams para cumplir sus objetivos.
Podemos leer los datos de nuestro feed
utilizando feed.createReadStream
y mostrar los datos en pantalla:
feed.createReadStream().pipe(process.stdout)
Como veras, console.log
es un WritableStream
en donde el destino es escribir en pantalla.
Utilizamos el método pipe
para conectar y definir el flujo de datos de nuestros streams.
// a --> b --> c
// unix like: a | b | c
a.pipe(b).pipe(c)
Supongamos que tenemos un feed local que utiliza la key pública de un feed remoto. En algún momento vamos a querer leer sus datos, tenemos su key por lo que podemos desencriptarlos.
Pero antes de desencriptar los datos deberíamos poder obtenerlos, traerlos a nuestro feed local y unificarlos
con los datos que ya tenemos. A este proceso lo llamamos replicacion
.
Queremos replicar los datos del feed remoto en nuestro feed local.
Para poder lograrlo, volvemos a utilizar streams. Hypercore API ofrece un feed.replicate()
que retorna un
replication stream el cual lee la data de un feed remoto, la incorpora a su feed local y finalmente pasa el resultado
al siguiente stream, es decir se comporta como un DuplexStream
.
Con replicate()
podemos replicar los datos de un feed remoto en nuestro feed local pero tambien debemos
pensar que el feed remoto puede estar desactualizado.
!> Todos los peers deberían tener, eventualmente, la última versión de los datos.
Si tomamos en cuenta que la conexión entre dos peers es bidireccional podríamos hacer lo siguiente:
// (1) (2)
const r1 = remoteFeed.replicate()
const r2 = localFeed.replicate()
r1.pipe(r2).pipe(r1)
- Primero recibimos los datos de un feed remoto y los replicamos en nuestro feed local.
- Una vez que tenemos nuestro feed actualizado, enviamos los datos nuevamente al feed remoto para que se actualice en caso de tener data inconsistente.
- Al final, ambos feed tienen la misma versión de los datos.
Vamos a simular leer mensajes que otro peer escribio. Para eso:
- Vamos a sincronizar el feed local (con el del peer).
- Una vez finalizada la sincronización, leeremos los datos del feed y cargaremos cada mensaje en un array.
- Una vez finalizada la lectura del feed, retornar el listado de mensajes.
Por implementación de Node, si tenemos streams conectados por .pipe
y uno de ellos
se destruye, el resto sigue funcionando.
Nosotros queremos que si algún stream se destruye (intencionalmente o por error) que todos los streams conectados también lo hagan. Por eso vamos a utilizar el modulo pump para remplazar a pipe.
Pump nos permite pipear nuestros streams y asegurarnos que en caso de que uno se destruya, todos lo hagan. 🆒
Como feature extra, el último argumento de pump puede ser una función que se ejecuta cuando finalizan todos los streams.
a.pipe(b).pipe(c)
// to
pump(a, b, c, err => {
console.log('all streams have finished')
})
Un WritableStream nos permite iterar sobre los chunks que fluyen en los streams y escribirlos en donde queramos: disco, network, screen o inclusive en nuestra memoria.
Sabiendo esto, podemos definir un WritableStream que itere sobre los chunks de forma similar a un [].forEach
y guardarlos en la estructura (un Map
por ejemplo) que necesitemos.
Les recomendamos que investiguen forEachChunk
, una función que armamos para ayudarlos a cumplir su objetivo.
$ npm test ./04