Skip to content

Commit

Permalink
Add descriptions and fix formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
Felix Hennig committed Sep 12, 2024
1 parent efdbebb commit 6cddc91
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 73 deletions.
124 changes: 61 additions & 63 deletions docs/modules/demos/pages/data-lakehouse-iceberg-trino-spark.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,11 @@ For details on the NiFi workflow ingesting water-level data, read the xref:nifi-

== Spark

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html[Spark Structured Streaming] is used to
stream data from Kafka into the lakehouse.
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html[Spark Structured Streaming] is used to stream data from Kafka into the lakehouse.

=== Accessing the web interface

To have access to the Spark web interface you need to run the following command to forward port 4040 to your local
machine.
To have access to the Spark web interface you need to run the following command to forward port 4040 to your local machine.

[source,console]
----
Expand All @@ -249,23 +247,27 @@ image::data-lakehouse-iceberg-trino-spark/spark_1.png[]

=== Listing the running Structured Streaming jobs

The UI displays the last job runs. Each running Structured Streaming job creates lots of Spark jobs internally. Click on
the `Structured Streaming` tab to see the running streaming jobs.
The UI displays the last job runs.
Each running Structured Streaming job creates lots of Spark jobs internally.
Click on the `Structured Streaming` tab to see the running streaming jobs.

image::data-lakehouse-iceberg-trino-spark/spark_2.png[]

Five streaming jobs are currently running. You can also click on a streaming job to get more details. For the job
`ingest smart_city shared_bikes_station_status` click, on the `Run ID` highlighted in blue to open them up.
Five streaming jobs are currently running.
You can also click on a streaming job to get more details.
For the job `ingest smart_city shared_bikes_station_status` click, on the `Run ID` highlighted in blue to open them up.

image::data-lakehouse-iceberg-trino-spark/spark_3.png[]

=== How the Structured Streaming jobs work

The demo has started all the running streaming jobs. Look at the {demo-code}[demo code] to see the actual code
submitted to Spark. This document will explain one specific ingestion job - `ingest water_level measurements`.
The demo has started all the running streaming jobs. Look at the {demo-code}[demo code] to see the actual code submitted to Spark.
This document will explain one specific ingestion job - `ingest water_level measurements`.

The streaming job is written in Python using `pyspark`. First off, the schema used to parse the JSON coming from Kafka
is defined. Nested structures or arrays are supported as well. The schema differs from job to job.
The streaming job is written in Python using `pyspark`.
First off, the schema used to parse the JSON coming from Kafka is defined.
Nested structures or arrays are supported as well.
The schema differs from job to job.

[source,python]
----
Expand All @@ -276,16 +278,13 @@ schema = StructType([ \
])
----

Afterwards, a streaming read from Kafka is started. It reads from our Kafka at `kafka:9093` with the topic
`water_levels_measurements`. When starting up, the job will ready all the existing messages in Kafka (read from
earliest) and will process 50000000 records as a maximum in a single batch. As Kafka has retention set up, Kafka records
might alter out of the topic before Spark has read the records, which can be the case when the Spark application wasn't
running or crashed for too long. In the case of this demo, the streaming job should not error out. For a production job,
`failOnDataLoss` should be set to `true` so that missing data does not go unnoticed - and Kafka offsets need to be
adjusted manually, as well as some post-loading of data.
Afterwards, a streaming read from Kafka is started. It reads from our Kafka at `kafka:9093` with the topic `water_levels_measurements`.
When starting up, the job will ready all the existing messages in Kafka (read from earliest) and will process 50000000 records as a maximum in a single batch.
As Kafka has retention set up, Kafka records might alter out of the topic before Spark has read the records, which can be the case when the Spark application wasn't running or crashed for too long.
In the case of this demo, the streaming job should not error out.
For a production job, `failOnDataLoss` should be set to `true` so that missing data does not go unnoticed - and Kafka offsets need to be adjusted manually, as well as some post-loading of data.

*Note:* The following Python snippets belong to a single Python statement but are split into separate blocks for better
explanation.
*Note:* The following Python snippets belong to a single Python statement but are split into separate blocks for better explanation.

[source,python]
----
Expand All @@ -300,8 +299,7 @@ spark \
.load() \
----

So far we have a `readStream` reading from Kafka. Records on Kafka are simply a byte-stream, so they must be converted
to strings and the json needs to be parsed.
So far we have a `readStream` reading from Kafka. Records on Kafka are simply a byte-stream, so they must be converted to strings and the json needs to be parsed.

[source,python]
----
Expand All @@ -319,10 +317,10 @@ Have a look at the {spark-streaming-docs}[Spark streaming documentation on Kafka
.select("json.station_uuid", "json.timestamp", "json.value") \
----

After all these transformations, we need to specify the sink of the stream, in this case, the Iceberg lakehouse. We are
writing in the `iceberg` format using the `update` mode rather than the "normal" `append` mode. Spark will aim for a
micro-batch every 2 minutes and save its checkpoints (its current offsets on the Kafka topic) in the specified S3
location. Afterwards, the streaming job will be started by calling `.start()`.
After all these transformations, we need to specify the sink of the stream, in this case, the Iceberg lakehouse.
We are writing in the `iceberg` format using the `update` mode rather than the "normal" `append` mode.
Spark will aim for a micro-batch every 2 minutes and save its checkpoints (its current offsets on the Kafka topic) in the specified S3 location.
Afterwards, the streaming job will be started by calling `.start()`.

[source,python]
----
Expand All @@ -345,10 +343,9 @@ One important part was skipped during the walkthrough:
.foreachBatch(upsertWaterLevelsMeasurements) \
----

`upsertWaterLevelsMeasurements` is a Python function that describes inserting the records from Kafka into the lakehouse
table. This specific streaming job removes all duplicate records that can occur because of how the PegelOnline API works
and gets called. As we don't want duplicate rows in our lakehouse tables, we need to filter the duplicates out as
follows.
`upsertWaterLevelsMeasurements` is a Python function that describes inserting the records from Kafka into the lakehouse table.
This specific streaming job removes all duplicate records that can occur because of how the PegelOnline API works and gets called.
As we don't want duplicate rows in our lakehouse tables, we need to filter the duplicates out as follows.

[source,python]
----
Expand All @@ -363,17 +360,16 @@ def upsertWaterLevelsMeasurements(microBatchOutputDF, batchId):
""")
----

First, the data frame containing the upserts (records from Kafka) will be registered as a temporary view so that they
can be accessed via Spark SQL. Afterwards, the `MERGE INTO` statement adds the new records to the lakehouse table.
First, the data frame containing the upserts (records from Kafka) will be registered as a temporary view so that they can be accessed via Spark SQL.
Afterwards, the `MERGE INTO` statement adds the new records to the lakehouse table.

The incoming records are first de-duplicated (using `SELECT DISTINCT * FROM waterLevelsMeasurementsUpserts`) so that the
data from Kafka does not contain duplicates. Afterwards, the - now duplication-free - records get added to the
`lakehouse.water_levels.measurements`, but *only* if they still need to be present.
The incoming records are first de-duplicated (using `SELECT DISTINCT * FROM waterLevelsMeasurementsUpserts`) so that the data from Kafka does not contain duplicates.
Afterwards, the - now duplication-free - records get added to the `lakehouse.water_levels.measurements`, but *only* if they still need to be present.

=== The Upsert mechanism

The `MERGE INTO` statement can be used for de-duplicating data and updating existing rows in the lakehouse table. The
`ingest water_level stations` streaming job uses the following `MERGE INTO` statement:
The `MERGE INTO` statement can be used for de-duplicating data and updating existing rows in the lakehouse table.
The `ingest water_level stations` streaming job uses the following `MERGE INTO` statement:

[source,sql]
----
Expand All @@ -389,25 +385,25 @@ WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
----

First, the data within a batch is de-deduplicated as well. The record containing the station update with the highest
Kafka timestamp is the newest and will be used during Upsert.
First, the data within a batch is de-deduplicated as well.
The record containing the station update with the highest Kafka timestamp is the newest and will be used during Upsert.

If a record for a station (detected by the same `station_uuid`) already exists, its contents will be updated. If the
station is yet to be discovered, it will be inserted. The `MERGE INTO` also supports updating subsets of fields and more
complex calculations, e.g. incrementing a counter. For details, have a look at the
{iceberg-merge-docs}[Iceberg MERGE INTO documentation].
If a record for a station (detected by the same `station_uuid`) already exists, its contents will be updated.
If the station is yet to be discovered, it will be inserted.
The `MERGE INTO` also supports updating subsets of fields and more complex calculations, e.g. incrementing a counter.
For details, have a look at the {iceberg-merge-docs}[Iceberg MERGE INTO documentation].

=== The Delete mechanism

The `MERGE INTO` statement can de-duplicate data and update existing lakehouse table rows. For details have a look at
the {iceberg-merge-docs}[Iceberg MERGE INTO documentation].
The `MERGE INTO` statement can de-duplicate data and update existing lakehouse table rows.
For details have a look at the {iceberg-merge-docs}[Iceberg MERGE INTO documentation].

=== Table maintenance

As mentioned, Iceberg supports out-of-the-box {iceberg-table-maintenance}[table maintenance] such as compaction.

This demo executes some maintenance functions in a rudimentary Python loop with timeouts in between. When running in
production, the maintenance can be scheduled using Kubernetes {k8s-cronjobs}[CronJobs] or {airflow}[Apache Airflow],
This demo executes some maintenance functions in a rudimentary Python loop with timeouts in between.
When running in production, the maintenance can be scheduled using Kubernetes {k8s-cronjobs}[CronJobs] or {airflow}[Apache Airflow],
which the Stackable Data Platform also supports.

[source,python]
Expand Down Expand Up @@ -439,7 +435,8 @@ while True:
time.sleep(25 * 60) # Assuming compaction takes 5 min run every 30 minutes
----

The scripts have a dictionary of all the tables to run maintenance on. The following procedures are run:
The scripts have a dictionary of all the tables to run maintenance on.
The following procedures are run:

==== https://iceberg.apache.org/docs/latest/spark-procedures/#expire_snapshots[expire_snapshots]

Expand Down Expand Up @@ -495,8 +492,8 @@ Here you can see all the available Trino catalogs.

== Superset

Superset provides the ability to execute SQL queries and build dashboards. Open the Superset endpoint
`external-http` in your browser (http://87.106.122.58:32452 in this case).
Superset provides the ability to execute SQL queries and build dashboards.
Open the Superset endpoint `external-http` in your browser (http://87.106.122.58:32452 in this case).

image::data-lakehouse-iceberg-trino-spark/superset_1.png[]

Expand All @@ -506,13 +503,13 @@ image::data-lakehouse-iceberg-trino-spark/superset_2.png[]

=== Viewing the Dashboard

The demo has created dashboards to visualize the different data sources. Select the `Dashboards` tab at the top to view
these dashboards.
The demo has created dashboards to visualize the different data sources.
Select the `Dashboards` tab at the top to view these dashboards.

image::data-lakehouse-iceberg-trino-spark/superset_3.png[]

Click on the dashboard called `House sales`. It might take some time until the dashboards renders all the included
charts.
Click on the dashboard called `House sales`.
It might take some time until the dashboards renders all the included charts.

image::data-lakehouse-iceberg-trino-spark/superset_4.png[]

Expand All @@ -528,27 +525,28 @@ There are multiple other dashboards you can explore on your own.

=== Viewing Charts

The dashboards consist of multiple charts. To list the charts, select the `Charts` tab at the top.
The dashboards consist of multiple charts.
To list the charts, select the `Charts` tab at the top.

=== Executing arbitrary SQL statements

Within Superset, you can create dashboards and run arbitrary SQL statements. On the top click on the tab `SQL` ->
`SQL Lab`.
Within Superset, you can create dashboards and run arbitrary SQL statements.
On the top click on the tab `SQL` -> `SQL Lab`.

image::data-lakehouse-iceberg-trino-spark/superset_7.png[]

On the left, select the database `Trino lakehouse`, the schema `house_sales`, and set `See table schema` to
`house_sales`.
On the left, select the database `Trino lakehouse`, the schema `house_sales`, and set `See table schema` to `house_sales`.

[IMPORTANT]
====
The older screenshot below shows how the table preview would look like. Currently, there is an https://github.com/apache/superset/issues/25307[open issue]
with previewing trino tables using the Iceberg connector. This doesn't affect the execution the following execution of the SQL statement.
The older screenshot below shows how the table preview would look like. Currently, there is an https://github.com/apache/superset/issues/25307[open issue] with previewing trino tables using the Iceberg connector.
This doesn't affect the execution the following execution of the SQL statement.
====

image::data-lakehouse-iceberg-trino-spark/superset_8.png[]

In the right textbox, you can enter the desired SQL statement. If you want to avoid making one up, use the following:
In the right textbox, you can enter the desired SQL statement.
If you want to avoid making one up, use the following:

[source,sql]
----
Expand Down
5 changes: 2 additions & 3 deletions docs/modules/demos/pages/hbase-hdfs-load-cycling-data.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ cycling-tripdata
----

Secondly, we'll use `org.apache.hadoop.hbase.tool.LoadIncrementalHFiles` (see {bulkload}[bulk load docs]) to import
the Hfiles into the table and ingest rows.
the Hfiles into the table and ingest rows.

Now we will see how many rows are in the `cycling-tripdata` table:

Expand Down Expand Up @@ -209,8 +209,7 @@ image::hbase-hdfs-load-cycling-data/hbase-table-ui.png[]

== Accessing the HDFS web interface

You can also see HDFS details via a UI by running `stackablectl stacklet list` and following the link next to one of
the namenodes.
You can also see HDFS details via a UI by running `stackablectl stacklet list` and following the link next to one of the namenodes.

Below you will see the overview of your HDFS cluster.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ What might also be interesting is the average and current measurement of the sta

[source,sql]
----
select
select
stations.longname as station,
avg("value") as avg_measurement,
latest_by("value", measurements."__time") as current_measurement,
Expand Down
8 changes: 4 additions & 4 deletions docs/modules/demos/pages/signal-processing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ image::signal-processing/overview.png[]

== Data ingestion

The data used in this demo is a set of gas sensor measurements*.
The dataset provides resistance values (r-values hereafter) for each of 14 gas sensors.
In order to simulate near-real-time ingestion of this data, it is downloaded and batch-inserted into a Timescale table.
It's then updated in-place retaining the same time offsets but shifting the timestamps such that the notebook code can "move through" the data using windows as if it were being streamed.
The data used in this demo is a set of gas sensor measurements*.
The dataset provides resistance values (r-values hereafter) for each of 14 gas sensors.
In order to simulate near-real-time ingestion of this data, it is downloaded and batch-inserted into a Timescale table.
It's then updated in-place retaining the same time offsets but shifting the timestamps such that the notebook code can "move through" the data using windows as if it were being streamed.
The Nifi flow that does this can easily be extended to process other sources of (actually streamed) data.

== JupyterHub
Expand Down
5 changes: 3 additions & 2 deletions stacks/stacks-v2.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
---
stacks:
monitoring:
description: Stack containing Prometheus and Grafana
stackableRelease: 24.7
stackableOperators:
- commons
- listener
- commons
- listener
labels:
- monitoring
- prometheus
Expand Down

0 comments on commit 6cddc91

Please sign in to comment.