|
| 1 | +--- |
| 2 | +title: Tracking and Transforming Data via Streams |
| 3 | +sidebar_label: Stream |
| 4 | +--- |
| 5 | + |
| 6 | +A stream in Databend is a dynamic and real-time representation of changes to a table. Streams are created to capture and track modifications to the associated table, allowing continuous consumption and analysis of data changes as they occur. |
| 7 | + |
| 8 | +### How Stream Works |
| 9 | + |
| 10 | +This section provides a quick example illustrating what a stream looks like and how it works. Let's say we have a table named 't' and we create a stream to capture the table changes. Once created, the stream starts to capture data changes to the table: |
| 11 | + |
| 12 | + |
| 13 | + |
| 14 | +**A Databend stream currently operates in an Append-only mode**. In this mode, the stream exclusively contains data insertion records, reflecting the latest changes to the table. Although data updates and deletions are not directly recorded, they are still taken into account. |
| 15 | + |
| 16 | +For example, if a row is added and later updated with new values, the stream records the insertion along with the updated values. Similarly, if a row is added and subsequently deleted, the stream reflects these changes accordingly: |
| 17 | + |
| 18 | + |
| 19 | + |
| 20 | +**A stream can be consumed by DML (Data Manipulation Language) operations**. After consumption, the stream contains no data but can continue to capture new changes, if any. |
| 21 | + |
| 22 | + |
| 23 | + |
| 24 | +### Transactional Support for Stream Consumption |
| 25 | + |
| 26 | +In Databend, stream consumption is transactional within single-statement transactions. This means: |
| 27 | + |
| 28 | +**Successful Transaction**: If a transaction is committed, the stream is consumed. For instance: |
| 29 | +```sql |
| 30 | +INSERT INTO table SELECT * FROM stream; |
| 31 | +``` |
| 32 | +If this `INSERT` transaction commits, the stream is consumed. |
| 33 | + |
| 34 | +**Failed Transaction**: If the transaction fails, the stream remains unchanged and available for future consumption. |
| 35 | + |
| 36 | +**Concurrent Access**: *Only one transaction can successfully consume a stream at a time*. If multiple transactions attempt to consume the same stream, only the first committed transaction succeeds, others fail. |
| 37 | + |
| 38 | +### Table Metadata for Stream |
| 39 | + |
| 40 | +**A stream does not store any data for a table**. After creating a stream for a table, Databend introduces specific hidden metadata columns to the table for change tracking purposes. These columns include: |
| 41 | + |
| 42 | +| Column | Description | |
| 43 | +|-----------------------|-----------------------------------------------------------------------------------| |
| 44 | +| _origin_version | Identifies the table version in which this row was initially created. | |
| 45 | +| _origin_block_id | Identifies the block ID to which this row belonged previously. | |
| 46 | +| _origin_block_row_num | Identifies the row number within the block to which this row belonged previously. | |
| 47 | + |
| 48 | +To display the values of these columns, use the SELECT statement: |
| 49 | + |
| 50 | +```sql title='Example:' |
| 51 | +CREATE TABLE t(a int); |
| 52 | +INSERT INTO t VALUES (1); |
| 53 | +CREATE STREAM s ON TABLE t; |
| 54 | +INSERT INTO t VALUES (2); |
| 55 | +SELECT *, _origin_version, _origin_block_id, _origin_block_row_num |
| 56 | +FROM t; |
| 57 | +┌───────────────────────────────────────────────────────────────────────────────────────┐ |
| 58 | +│ a │ _origin_version │ _origin_block_id │ _origin_block_row_num │ |
| 59 | +├─────────────────┼──────────────────┼──────────────────────────┼───────────────────────┤ |
| 60 | +│ 1 │ NULL │ NULL │ NULL │ |
| 61 | +│ 2 │ NULL │ NULL │ NULL │ |
| 62 | +└───────────────────────────────────────────────────────────────────────────────────────┘ |
| 63 | + |
| 64 | +UPDATE t SET a = 3 WHERE a = 2; |
| 65 | +SELECT *, _origin_version, _origin_block_id, _origin_block_row_num |
| 66 | +FROM t; |
| 67 | + |
| 68 | +┌─────────────────────────────────────────────────────────────────────────────────────────────────────┐ |
| 69 | +│ a │ _origin_version │ _origin_block_id │ _origin_block_row_num │ |
| 70 | +├─────────────────┼──────────────────┼────────────────────────────────────────┼───────────────────────┤ |
| 71 | +│ 1 │ NULL │ NULL │ NULL │ |
| 72 | +│ 3 │ 10930 │ 44506450595794391199934376694987431316 │ 0 │ |
| 73 | +└─────────────────────────────────────────────────────────────────────────────────────────────────────┘ |
| 74 | +``` |
| 75 | + |
| 76 | +### Stream Columns |
| 77 | + |
| 78 | +You can use the SELECT statement to directly query a stream and retrieve the tracked changes. When querying a stream, consider incorporating these hidden columns for additional details about the changes: |
| 79 | + |
| 80 | +| Column | Description | |
| 81 | +|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |
| 82 | +| change$action | Type of change: INSERT or DELETE. | |
| 83 | +| change$is_update | Indicates whether the `change$action` is part of an UPDATE. In a stream, an UPDATE is represented by a combination of DELETE and INSERT operations, with this field set to `true`. | |
| 84 | +| change$row_id | Unique identifier for each row to track changes. | |
| 85 | + |
| 86 | +```sql title='Example:' |
| 87 | +CREATE TABLE t(a int); |
| 88 | +INSERT INTO t VALUES (1); |
| 89 | +CREATE STREAM s ON TABLE t; |
| 90 | +INSERT INTO t VALUES (2); |
| 91 | + |
| 92 | +SELECT a, change$action, change$is_update, change$row_id |
| 93 | +FROM s; |
| 94 | + |
| 95 | +┌─────────────────────────────────────────────────────────────────────────────────────────────┐ |
| 96 | +│ a │ change$action │ change$is_update │ change$row_id │ |
| 97 | +├─────────────────┼───────────────┼──────────────────┼────────────────────────────────────────┤ |
| 98 | +│ 2 │ INSERT │ false │ a577745c6a404f3384fa95791eb43f22000000 │ |
| 99 | +└─────────────────────────────────────────────────────────────────────────────────────────────┘ |
| 100 | + |
| 101 | +-- If you add a new row and then update it, |
| 102 | +-- the stream considers the changes as an INSERT with your updated value. |
| 103 | +UPDATE t SET a = 3 WHERE a = 2; |
| 104 | +SELECT a, change$action, change$is_update, change$row_id |
| 105 | +FROM s; |
| 106 | + |
| 107 | +┌─────────────────────────────────────────────────────────────────────────────────────────────┐ |
| 108 | +│ a │ change$action │ change$is_update │ change$row_id │ |
| 109 | +├─────────────────┼───────────────┼──────────────────┼────────────────────────────────────────┤ |
| 110 | +│ 3 │ INSERT │ false │ a577745c6a404f3384fa95791eb43f22000000 │ |
| 111 | +└─────────────────────────────────────────────────────────────────────────────────────────────┘ |
| 112 | +``` |
| 113 | + |
| 114 | +### Managing Streams |
| 115 | + |
| 116 | +To manage streams in Databend, use the following commands: |
| 117 | + |
| 118 | +<IndexOverviewList /> |
| 119 | + |
| 120 | +### Example: Tracking and Transforming Data in Real-Time |
| 121 | + |
| 122 | +The following example demonstrates how to use streams to capture and track user activities in real-time. |
| 123 | + |
| 124 | +#### 1. Creating Tables |
| 125 | + |
| 126 | +The example uses three tables: |
| 127 | +* `user_activities` table records user activities. |
| 128 | +* `user_profiles` table stores user profiles. |
| 129 | +* `user_activity_profiles` table is a combined view of the two tables. |
| 130 | + |
| 131 | +The `activities_stream` table is created as a stream to capture real-time changes to the `user_activities` table. The stream is then consumed by a query to update the` user_activity_profiles` table with the latest data. |
| 132 | + |
| 133 | +```sql |
| 134 | +-- Create a table to record user activities |
| 135 | +CREATE TABLE user_activities ( |
| 136 | + user_id INT, |
| 137 | + activity VARCHAR, |
| 138 | + timestamp TIMESTAMP |
| 139 | +); |
| 140 | + |
| 141 | +-- Create a table to store user profiles |
| 142 | +CREATE TABLE user_profiles ( |
| 143 | + user_id INT, |
| 144 | + username VARCHAR, |
| 145 | + location VARCHAR |
| 146 | +); |
| 147 | + |
| 148 | +-- Insert data into the user_profiles table |
| 149 | +INSERT INTO user_profiles VALUES (101, 'Alice', 'New York'); |
| 150 | +INSERT INTO user_profiles VALUES (102, 'Bob', 'San Francisco'); |
| 151 | +INSERT INTO user_profiles VALUES (103, 'Charlie', 'Los Angeles'); |
| 152 | +INSERT INTO user_profiles VALUES (104, 'Dana', 'Chicago'); |
| 153 | + |
| 154 | +-- Create a table for the combined view of user activities and profiles |
| 155 | +CREATE TABLE user_activity_profiles ( |
| 156 | + user_id INT, |
| 157 | + username VARCHAR, |
| 158 | + location VARCHAR, |
| 159 | + activity VARCHAR, |
| 160 | + activity_timestamp TIMESTAMP |
| 161 | +); |
| 162 | +``` |
| 163 | + |
| 164 | +#### 2. Creating a Stream |
| 165 | + |
| 166 | +Create a stream on the `user_activities` table to capture real-time changes: |
| 167 | +```sql |
| 168 | +CREATE STREAM activities_stream ON TABLE user_activities; |
| 169 | +``` |
| 170 | + |
| 171 | +#### 3. Inserting Data into the Source Table |
| 172 | + |
| 173 | +Insert data into the `user_activities` table to make some changes: |
| 174 | +```sql |
| 175 | +INSERT INTO user_activities VALUES (102, 'logout', '2023-12-19 09:00:00'); |
| 176 | +INSERT INTO user_activities VALUES (103, 'view_profile', '2023-12-19 09:15:00'); |
| 177 | +INSERT INTO user_activities VALUES (104, 'edit_profile', '2023-12-19 10:00:00'); |
| 178 | +INSERT INTO user_activities VALUES (101, 'purchase', '2023-12-19 10:30:00'); |
| 179 | +INSERT INTO user_activities VALUES (102, 'login', '2023-12-19 11:00:00'); |
| 180 | +``` |
| 181 | + |
| 182 | +#### 4. Consuming the Stream to Update the Target Table |
| 183 | + |
| 184 | +Consume the stream to update the `user_activity_profiles` table: |
| 185 | +```sql |
| 186 | +-- Inserting data into the user_activity_profiles table |
| 187 | +INSERT INTO user_activity_profiles |
| 188 | +SELECT |
| 189 | + a.user_id, p.username, p.location, a.activity, a.timestamp |
| 190 | +FROM |
| 191 | + -- Source table for changed data |
| 192 | + activities_stream AS a |
| 193 | +JOIN |
| 194 | + -- Joining with user profile data |
| 195 | + user_profiles AS p |
| 196 | +ON |
| 197 | + a.user_id = p.user_id |
| 198 | + |
| 199 | +-- a.change$action is a column indicating the type of change (Databend only supports INSERT for now) |
| 200 | +WHERE a.change$action = 'INSERT'; |
| 201 | +``` |
| 202 | + |
| 203 | +Then, check the updated `user_activity_profiles` table: |
| 204 | +```sql |
| 205 | +SELECT |
| 206 | + * |
| 207 | +FROM |
| 208 | + user_activity_profiles |
| 209 | + |
| 210 | +┌────────────────────────────────────────────────────────────────────────────────────────────────┐ |
| 211 | +│ user_id │ username │ location │ activity │ activity_timestamp │ |
| 212 | +├─────────────────┼──────────────────┼──────────────────┼──────────────────┼─────────────────────┤ |
| 213 | +│ 103 │ Charlie │ Los Angeles │ view_profile │ 2023-12-19 09:15:00 │ |
| 214 | +│ 104 │ Dana │ Chicago │ edit_profile │ 2023-12-19 10:00:00 │ |
| 215 | +│ 101 │ Alice │ New York │ purchase │ 2023-12-19 10:30:00 │ |
| 216 | +│ 102 │ Bob │ San Francisco │ login │ 2023-12-19 11:00:00 │ |
| 217 | +│ 102 │ Bob │ San Francisco │ logout │ 2023-12-19 09:00:00 │ |
| 218 | +└────────────────────────────────────────────────────────────────────────────────────────────────┘ |
| 219 | +``` |
| 220 | + |
| 221 | +#### 5. Task Update for Real-Time Data Processing |
| 222 | + |
| 223 | +To keep the `user_activity_profiles` table current, it's important to periodically synchronize it with data from the `activities_stream`. This synchronization should be aligned with the update intervals of the `user_activities` table, ensuring that the user_activity_profiles accurately reflects the latest user activities and profiles for real-time data analysis. |
| 224 | + |
| 225 | +The Databend `TASK` command(currently in private preview), can be utilized to define a task that updates the `user_activity_profiles` table every minute or seconds. |
| 226 | + |
| 227 | +```sql |
| 228 | +-- Define a task in Databend |
| 229 | +CREATE TASK user_activity_task |
| 230 | +WAREHOUSE = 'default' |
| 231 | +SCHEDULE = 1 MINUTE |
| 232 | +-- Trigger task when new data arrives in activities_stream |
| 233 | +WHEN system$stream_has_data('activities_stream') AS |
| 234 | + -- Insert new records into user_activity_profiles |
| 235 | + INSERT INTO user_activity_profiles |
| 236 | + SELECT |
| 237 | + -- Join activities_stream with user_profiles based on user_id |
| 238 | + a.user_id, p.username, p.location, a.activity, a.timestamp |
| 239 | + FROM |
| 240 | + activities_stream AS a |
| 241 | + JOIN user_profiles AS p |
| 242 | + ON a.user_id = p.user_id |
| 243 | + -- Include only rows where the action is 'INSERT' |
| 244 | + WHERE a.change$action = 'INSERT'; |
| 245 | +``` |
| 246 | + |
| 247 | +:::tip Task in Private Preview |
| 248 | +The `TASK` command is currently in private preview, so the synatx and usage may change in the future. |
| 249 | +::: |
0 commit comments