Well, if you have written even a small node.js app, chances are, that you have used streams somewhere in the road, being aware of it or not.
In fact, streams
are part of the node.js core. Every request or response from a server, every console.log
or filesystem operation are using streams in one way or another. 💥
We can picture a stream as an interface for a data flow a---b---c
which can change over time and where data flows from a source to a destination.
For example, using streams
we can read a file by chunks making use of a ReadableStream
, apply some kind of transformation with a TransformStream
and finally, write every chunk of data into a specific destination using a WritableStream
.
Some streams
operate in a single way, like a ReadableStream
which only reads from a source and pass the data to the next stream:
ReadableStream ---> ( DuplexStream | TransformStream ) ---> WriteableStream
Other types of streams
can operate in bi-directional manner, ie, can perform reads and writes. This type of stream
is known as a DuplexStream
. One example is the well known Socket.
!> It is important to keep in mind that this kind of interface exists only to define a common, scalable, and efficient way to communicate data. streams
help us to abstract the source and destination. It does not matter if we are reading or writing from/to a file or the net. streams
speak a unique language and this allows us to combine them the way we need.
🔗 If you want to keep on learning about streams, we recommend you read the stream-handbook.
Inside, hypercore
uses streams to work.
We can read our feed
using feed.createReadStream
and display that info on the console using process.stdout
:
feed.createReadStream().pipe(process.stdout)
As you can see, we are using the pipe
method to connect and define our data flow in our streams.
// a --> b --> c
// unix like: a | b | c
a.pipe(b).pipe(c)
Ok, let's suppose we have local feed and a remote public key. At some point we want to read data from this other feed, since we have it's PK, we can decrypt them.
But before decrypt them, we need to fetch and merge them into our local feed. This process is called replication
.
We want to replicate remote feed data into our local feed.
In order to do this, we are going to use streams. Hypercore API has a feed.replicate()
method which returns a replication stream that reads the remote feed, syncs it with the local feed and finally passes the result to the next stream. In other words, it behaves like a DuplexStream
.
With replicate()
we can combine the remote feed with our local feed but we need to be aware of our of data in our remote feed.
!> Eventually, all the peers should have the same (up to date) data.
If we see the conection between two peers as a bi-directional connection, we can do the following:
// (1) (2)
const r1 = remoteFeed.replicate()
const r2 = localFeed.replicate()
r1.pipe(r2).pipe(r1)
- First, data is received from the remote feed and replicated into our local feed.
- Once our feed is updated, data is sent to the remote feed. This is made to ensure consistency.
- Finally, both feeds have the same data.
We are going to simulate reading messages from another peer. To do that we need to:
- Sync the local feed with remote one.
- Once sync is done, read data from our local feed and push each message into an array.
- When we finish reading our feed, we need to return the messages list.
By design, if we have streams connected by .pipe
and one of them fails, the rest keep working.
This can lead to multiple error conditions. We want to destroy all the streams if one fails. That's why we are going to use
pump instead of .pipe
.
!> As an extra feature, with pump
we can pass a function as the last argument. This function will be executed when all the streams finish.
a.pipe(b).pipe(c)
// becomes
pump(a, b, c, err => {
console.log('all streams have finished')
})
A WritableStream
iterates through all the chunks of data flowing in our streams and we can write them wherever we want, eg: filesystem, network, memory, etc.
We have made a special function: forEachChunk
, which can be seen as a little helper to write data (and of course, it is a WritableStream
)
$ npm test ./04