Skip to content

Commit

Permalink
Add python event processing examples
Browse files Browse the repository at this point in the history
  • Loading branch information
gvdongen committed Dec 19, 2024
1 parent f514e7b commit 4fa755f
Show file tree
Hide file tree
Showing 16 changed files with 196 additions and 173 deletions.
73 changes: 0 additions & 73 deletions python/basics/app/5_events_processing.py

This file was deleted.

58 changes: 0 additions & 58 deletions python/basics/app/6_events_state.py

This file was deleted.

50 changes: 23 additions & 27 deletions python/patterns-use-cases/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ Processing events (from Kafka) to update various downstream systems.

2. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) with the Kafka broker configuration in a separate shell: `restate-server --config-file restate.toml`

3. Start the service: `./gradlew -PmainClass=my.example.eventtransactions.UserFeed run`
3. Start the service: `python -m hypercorn --config hypercorn-config.toml src/eventtransactions/user_feed:app`

4. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080`

Expand Down Expand Up @@ -484,26 +484,22 @@ Our Kafka broker only has a single partition so all these messages end up on the
You can see in the logs how events for different users are processed in parallel, but events for the same user are processed sequentially:

```shell
2024-12-17 18:07:43 INFO [UserFeed/processPost][inv_13puWeoWJykN17cPZQm43rQZxiPr0qNmhP] my.example.utils.Stubs - Creating post 300dbd34-eae8-4875-8a71-c18b14e2aed7 for user userid1
2024-12-17 18:07:43 INFO [UserFeed/processPost][inv_13puWeoWJykN17cPZQm43rQZxiPr0qNmhP] my.example.utils.Stubs - Content moderation for post 300dbd34-eae8-4875-8a71-c18b14e2aed7 is still pending... Will check again in 5 seconds
2024-12-17 18:07:46 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl3UzViEbqNPu6FZK4Y8KBAB] dev.restate.sdk.core.InvocationStateMachine - Start invocation
2024-12-17 18:07:46 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl3UzViEbqNPu6FZK4Y8KBAB] my.example.utils.Stubs - Creating post 011443bb-a47d-43a0-8df4-d2c4ea50b3b8 for user userid2
2024-12-17 18:07:46 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl3UzViEbqNPu6FZK4Y8KBAB] my.example.utils.Stubs - Content moderation for post 011443bb-a47d-43a0-8df4-d2c4ea50b3b8 is still pending... Will check again in 5 seconds
2024-12-17 18:07:48 INFO [UserFeed/processPost][inv_13puWeoWJykN17cPZQm43rQZxiPr0qNmhP] my.example.utils.Stubs - Content moderation for post 300dbd34-eae8-4875-8a71-c18b14e2aed7 is still pending... Will check again in 5 seconds
2024-12-17 18:07:56 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl3UzViEbqNPu6FZK4Y8KBAB] my.example.utils.Stubs - Content moderation for post 011443bb-a47d-43a0-8df4-d2c4ea50b3b8 is done
2024-12-17 18:07:56 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl3UzViEbqNPu6FZK4Y8KBAB] my.example.utils.Stubs - Updating user feed for user userid2 with post 011443bb-a47d-43a0-8df4-d2c4ea50b3b8
2024-12-17 18:07:56 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl3UzViEbqNPu6FZK4Y8KBAB] dev.restate.sdk.core.InvocationStateMachine - End invocation
2024-12-17 18:07:58 INFO [UserFeed/processPost][inv_13puWeoWJykN17cPZQm43rQZxiPr0qNmhP] my.example.utils.Stubs - Content moderation for post 300dbd34-eae8-4875-8a71-c18b14e2aed7 is still pending... Will check again in 5 seconds
2024-12-17 18:09:03 INFO [UserFeed/processPost][inv_13puWeoWJykN17cPZQm43rQZxiPr0qNmhP] my.example.utils.Stubs - Content moderation for post 300dbd34-eae8-4875-8a71-c18b14e2aed7 is still pending... Will check again in 5 seconds
2024-12-17 18:09:08 INFO [UserFeed/processPost][inv_13puWeoWJykN17cPZQm43rQZxiPr0qNmhP] my.example.utils.Stubs - Content moderation for post 300dbd34-eae8-4875-8a71-c18b14e2aed7 is done
2024-12-17 18:09:08 INFO [UserFeed/processPost][inv_13puWeoWJykN17cPZQm43rQZxiPr0qNmhP] my.example.utils.Stubs - Updating user feed for user userid1 with post 300dbd34-eae8-4875-8a71-c18b14e2aed7
2024-12-17 18:09:08 INFO [UserFeed/processPost][inv_13puWeoWJykN17cPZQm43rQZxiPr0qNmhP] dev.restate.sdk.core.InvocationStateMachine - End invocation
2024-12-17 18:09:08 INFO [UserFeed/processPost][inv_13puWeoWJykN0lJ761afYGoczigaKJDzWh] dev.restate.sdk.core.InvocationStateMachine - Start invocation
2024-12-17 18:09:08 INFO [UserFeed/processPost][inv_13puWeoWJykN0lJ761afYGoczigaKJDzWh] my.example.utils.Stubs - Creating post 738f0f12-8191-4702-bf49-59e1604ee799 for user userid1
2024-12-17 18:09:08 INFO [UserFeed/processPost][inv_13puWeoWJykN0lJ761afYGoczigaKJDzWh] my.example.utils.Stubs - Content moderation for post 738f0f12-8191-4702-bf49-59e1604ee799 is still pending... Will check again in 5 seconds
2024-12-17 18:09:48 INFO [UserFeed/processPost][inv_13puWeoWJykN0lJ761afYGoczigaKJDzWh] my.example.utils.Stubs - Content moderation for post 738f0f12-8191-4702-bf49-59e1604ee799 is done
2024-12-17 18:09:48 INFO [UserFeed/processPost][inv_13puWeoWJykN0lJ761afYGoczigaKJDzWh] my.example.utils.Stubs - Updating user feed for user userid1 with post 738f0f12-8191-4702-bf49-59e1604ee799
2024-12-17 18:09:48 INFO [UserFeed/processPost][inv_13puWeoWJykN0lJ761afYGoczigaKJDzWh] dev.restate.sdk.core.InvocationStateMachine - End invocation
[2024-12-19 16:32:22,550] [694674] [INFO] - Created post d91524b2-843c-4bce-8bfa-662b75f4ad45 for user userid1 with content: Hi! This is my first post!
[2024-12-19 16:32:22,551] [694674] [INFO] - Content moderation for post d91524b2-843c-4bce-8bfa-662b75f4ad45 is still pending... Will check again in 5 seconds
[2024-12-19 16:32:24,720] [694678] [INFO] - Created post 56d5b415-65f5-4e24-9eb4-5565936e1426 for user userid2 with content: Hi! This is my first post!
[2024-12-19 16:32:24,722] [694678] [INFO] - Content moderation for post 56d5b415-65f5-4e24-9eb4-5565936e1426 is still pending... Will check again in 5 seconds
[2024-12-19 16:32:29,734] [694678] [INFO] - Content moderation for post 56d5b415-65f5-4e24-9eb4-5565936e1426 is still pending... Will check again in 5 seconds
[2024-12-19 16:32:32,571] [694674] [INFO] - Content moderation for post d91524b2-843c-4bce-8bfa-662b75f4ad45 is done
[2024-12-19 16:32:32,572] [694674] [INFO] - Updating the user feed for user userid1 with post d91524b2-843c-4bce-8bfa-662b75f4ad45
[2024-12-19 16:32:32,575] [694674] [INFO] - Created post b5b4a544-1b9d-4459-a3db-d4805853bb7f for user userid1 with content: Hi! This is my second post!
[2024-12-19 16:32:32,576] [694674] [INFO] - Content moderation for post b5b4a544-1b9d-4459-a3db-d4805853bb7f is still pending... Will check again in 5 seconds
[2024-12-19 16:32:37,587] [694674] [INFO] - Content moderation for post b5b4a544-1b9d-4459-a3db-d4805853bb7f is done
[2024-12-19 16:32:37,588] [694674] [INFO] - Updating the user feed for user userid1 with post b5b4a544-1b9d-4459-a3db-d4805853bb7f
[2024-12-19 16:32:39,760] [694678] [INFO] - Content moderation for post 56d5b415-65f5-4e24-9eb4-5565936e1426 is still pending... Will check again in 5 seconds
[2024-12-19 16:32:44,770] [694678] [INFO] - Content moderation for post 56d5b415-65f5-4e24-9eb4-5565936e1426 is still pending... Will check again in 5 seconds
[2024-12-19 16:33:59,900] [694678] [INFO] - Content moderation for post 56d5b415-65f5-4e24-9eb4-5565936e1426 is still pending... Will check again in 5 seconds
[2024-12-19 16:34:04,909] [694678] [INFO] - Content moderation for post 56d5b415-65f5-4e24-9eb4-5565936e1426 is done
[2024-12-19 16:34:04,911] [694678] [INFO] - Updating the user feed for user userid2 with post 56d5b415-65f5-4e24-9eb4-5565936e1426
```
As you see, slow events do not block other slow events.
Expand Down Expand Up @@ -533,16 +529,16 @@ The Package Tracker Virtual Object tracks the package details and its location h
2. Start Restate Server with the Kafka broker configuration in a separate shell: `restate-server --config-file restate.toml`
3. Start the service: `./gradlew -PmainClass=my.example.eventenrichment.PackageTracker run`
3. Start the service: `python -m hypercorn --config hypercorn-config.toml src/eventenrichment/package_tracker:app`
4. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080`
5. Let Restate subscribe to the Kafka topic `package-location-updates` and invoke `PackageTracker/updateLocation` on each message.
5. Let Restate subscribe to the Kafka topic `package-location-updates` and invoke `package-tracker/updateLocation` on each message.
```shell
curl localhost:9070/subscriptions -H 'content-type: application/json' \
-d '{
"source": "kafka://my-cluster/package-location-updates",
"sink": "service://PackageTracker/updateLocation",
"sink": "service://package-tracker/updateLocation",
"options": {"auto.offset.reset": "earliest"}
}'
```
Expand All @@ -551,8 +547,8 @@ curl localhost:9070/subscriptions -H 'content-type: application/json' \
1. Register a new package via the RPC handler:
```shell
curl localhost:8080/PackageTracker/package1/registerPackage \
-H 'content-type: application/json' -d '{"finalDestination": "Bridge 6, Amsterdam"}'
curl localhost:8080/package-tracker/package1/registerPackage \
-H 'content-type: application/json' -d '{"final_destination": "Bridge 6, Amsterdam"}'
```
2. Start a Kafka producer and publish some messages to update the location of the package on the `package-location-updates` topic:
Expand All @@ -567,7 +563,7 @@ package1:{"timestamp": "2024-10-10 14:00", "location": "Mountain Road 155, Bruss
3. Query the package location via the RPC handler:
```shell
curl localhost:8080/PackageTracker/package1/getPackageInfo
curl localhost:8080/package-tracker/package1/getPackageInfo
```
or via the CLI: `restate kv get PackageTracker package1`
Expand Down
38 changes: 38 additions & 0 deletions python/patterns-use-cases/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
version: '3'
services:
broker:
image: confluentinc/cp-kafka:7.5.0
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

init-kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- broker
entrypoint: [ '/bin/sh', '-c' ]
command: |
"# blocks until kafka is reachable
kafka-topics --bootstrap-server broker:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server broker:29092 --create --if-not-exists --topic package-location-updates --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server broker:29092 --list"
3 changes: 3 additions & 0 deletions python/patterns-use-cases/restate.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[[ingress.kafka-clusters]]
name = "my-cluster"
brokers = ["PLAINTEXT://localhost:9092"]
Empty file.
1 change: 1 addition & 0 deletions python/patterns-use-cases/src/dataupload/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(process)d] [%(levelname)s] - %(message)s')


class User(BaseModel):
id: str
email: str
Expand Down
Empty file.
55 changes: 55 additions & 0 deletions python/patterns-use-cases/src/eventenrichment/package_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from typing import List

import restate
from pydantic import BaseModel
from restate import VirtualObject, ObjectContext
from restate.exceptions import TerminalError


class LocationUpdate(BaseModel):
location: str
timestamp: str


class PackageInfo(BaseModel):
final_destination: str
locations: List[LocationUpdate] = []


# Package tracking system:
# Digital twin representing a package in delivery with real-time location updates.
# Handlers get called over HTTP or Kafka.
package_tracker = VirtualObject("package-tracker")


# Called first by the seller over HTTP
@package_tracker.handler("registerPackage")
async def register_package(ctx: ObjectContext, package_info: PackageInfo):
# store in state the user's information as coming from the registration event
ctx.set("package-info", package_info.model_dump())


# Connected to a Kafka topic for real-time location updates
@package_tracker.handler("updateLocation")
async def update_location(ctx: ObjectContext, location_update: LocationUpdate):
# get the package info from the state
package_info = PackageInfo(**await ctx.get("package-info"))
if package_info is None:
raise TerminalError(f"Package {ctx.key()} not found")

# Update the package details in the state
locations = package_info.locations or []
locations.append(location_update)
package_info.locations = locations

# store the updated package info in state
ctx.set("package-info", package_info.model_dump())


# Called by the delivery dashboard to get the package details
@package_tracker.handler("getPackageInfo")
async def get_package_info(ctx: ObjectContext) -> PackageInfo:
return PackageInfo(**await ctx.get("package-info"))


app = restate.app(services=[package_tracker])
Empty file.
37 changes: 37 additions & 0 deletions python/patterns-use-cases/src/eventtransactions/user_feed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import restate
from restate import VirtualObject, ObjectContext
from datetime import timedelta

from src.eventtransactions.utils import create_post, get_post_status, update_user_feed, SocialMediaPost

# Processing events (from Kafka) to update various downstream systems
# - Journaling actions in Restate and driving retries from Restate, recovering
# partial progress
# - Preserving the order-per-key, but otherwise allowing high-fanout, because
# processing of events does not block other events.
# - Ability to delay events when the downstream systems are busy, without blocking
# entire partitions.
user_feed = VirtualObject("UserFeed")


# The Kafka key routes events to the correct Virtual Object.
# Events with the same key are processed one after the other.
@user_feed.handler("processPost")
async def process_post(ctx: ObjectContext, post: SocialMediaPost):
user_id = ctx.key()

# event handler is a durably executed function that can use all the features of Restate
post_id = await ctx.run("profile update", lambda: create_post(user_id, post))

# Delay processing until content moderation is complete (handler suspends when on FaaS).
# This only blocks other posts for this user (Virtual Object), not for other users.
while await ctx.run("post status", lambda: get_post_status(post_id)) == "PENDING":
await ctx.sleep(timedelta(seconds=5))

await ctx.run("update feed", lambda: update_user_feed(user_id, post_id))


app = restate.app(services=[user_feed])

# Process new posts for users via Kafka or by calling the endpoint over HTTP: curl
# localhost:8080/userFeed/userid1/processPost --json '{"content": "Hi! This is my first post!", "metadata": "public"}'
Loading

0 comments on commit 4fa755f

Please sign in to comment.