title | description |
---|---|
Mastodon trends |
Real-time analytics on the Mastodon firehose |
Mastodon is a federated social network, similar to a distributed twitter. Conveniently, it provides a public firehose of all posts, which will let us see how Arroyo can be used in a real-world setting.
In this tutorial we are going to build a simple dashboard that shows the most popular hashtags across the mastodon network.
Before starting this tutorial, you will need a running Arroyo cluster. See the getting started guide to get that set up.
The first thing we need to do is create a source that will read from the Mastodon firehose. We will be using the Server-Sent Events API (also known as EventSource) to read from the firehose, via this api.
Arroyo comes with an SSE connector, so we just need create a connection that points to the mastodon API. Connections in Arroyo allow queries to read and write data from external sources.
There are two ways to create a connection. We can create a re-usable connection in the Connections tab, or we can create a connection directly in SQL. For this tutorial, we'll create the connection directly in SQL.
Navigate to the Pipelines tab (http://localhost:5115/pipelines), and click Create Pipeline
.
Connection tables are created via CREATE TABLE
statements that have a
connector
with option, which specifies which connector should be used; in
this case sse
(for server-sent events). Also required is format
, which
specifies the format of the data (in this case, json
).
Then, each connector will have a set of options that may be set—for SSE, we need
to set the endpoint
option to tell it where to connect to. We're also setting
the events
field to filter the events that we will read. You can find all of the
options for the SSE connector in the connectors docs.
We can define the mastodon source connection like this:
CREATE TABLE mastodon (
id TEXT,
uri TEXT,
content TEXT
) WITH (
connector = 'sse',
format = 'json',
endpoint = 'http://mastodon.arroyo.dev/api/v1/streaming/public',
events = 'update'
);
WITH (
...
headers = 'Authorization: Bearer <auth_token>'
)
Then we can read from it, to get a sense of the data:
select * from mastodon;
Click Start Preview
to run a preview pipeline over the data.
You should see something like this:
Next we need to extract the hashtags from the content. Conveniently, the mastodon API already extracts hash tags for us like this:
{
...
"tags": [
{
"name": "arroyostreaming",
"url": "https://mastodon.social/tags/arroyostreaming"
}
],
}
To access the tags, we'll need to define our source slightly differently, using the raw string format. This
will give us a single column called value
with the raw JSON data.
CREATE TABLE mastodon (
value TEXT
) WITH (
connector = 'sse',
format = 'raw_string',
endpoint = 'http://mastodon.arroyo.dev/api/v1/streaming/public',
events = 'update'
);
Now we can pull out the tags using the extract_json
function:
SELECT tag FROM (
SELECT extract_json(value, '$.tags[*].name') AS tag
FROM mastodon)
WHERE tag IS NOT NULL;
The extract_json
function takes a string value and
a jsonpath expression and returns all matches in the input for the
expression. In this case, it returns a list with the name of each tag in the message.
We can turn this expression into a view (in Arroyo, a way to re-use a specific bit of SQL computation) to make it easier to work with:
CREATE VIEW tags AS (
SELECT btrim(unnest(tags), '"') as tag FROM (
SELECT extract_json(value, '$.tags[*].name') AS tags
FROM mastodon)
);
We've also added a call to unnest
, which is a
special operator that is able to "unroll" lists into individual records. This lets us handle all of the tags
in a single message. Then we've wrapped that in btrim
, as extract_json returns encoded strings like "tag"
—
btrim removes the leading and trailing "
s so we just get tag
.
The next step is to find the count of each hashtag over a window of time. We'll use a sliding window
that's 15 minutes wide (in other words, that looks back that far in the data stream) with a 5 second slide
(in other words, that updates every 5 seconds). In SQL, we can introduce a sliding window with the hop
function.
SELECT tag, count(*) AS count
FROM tags
GROUP BY tag, hop(interval '5 seconds', interval '15 minutes');
Finally, we can find the top tags by using a SQL window function to rank the tags by count:
SQL window functions are not the same as Arroyo windows. SQL window functions are a way to perform calculations over multiple rows of data, while Arroyo windows aggregate over time.See [here](https://en.wikipedia.org/wiki/Window_function_(SQL)) for more on SQL window functions.
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY window
ORDER BY count DESC) as row_num
FROM (SELECT count(*) as count,
tag,
hop(interval '5 seconds', interval '15 minutes') as window
FROM tags
group by tag, window)) WHERE row_num <= 5
This produces the top 5 tags over the last 15 minutes by filtering on the row_num
column, which
contains that tag's rank within the window.
Now that we have our complete query:
CREATE TABLE mastodon (
value TEXT
) WITH (
connector = 'sse',
format = 'raw_string',
endpoint = 'http://mastodon.arroyo.dev/api/v1/streaming/public',
events = 'update'
);
CREATE VIEW tags AS (
SELECT btrim(unnest(tags), '"') as tag FROM (
SELECT extract_json(value, '$.tags[*].name') AS tags
FROM mastodon)
);
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY window
ORDER BY count DESC) as row_num
FROM (SELECT count(*) as count,
tag,
hop(interval '5 seconds', interval '15 minutes') as window
FROM tags
group by tag, window)) WHERE row_num <= 5;
We can run the pipeline for real by clicking Launch
. Give it a name,
then click Start
to run the pipeline.