Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kotlin event processing examples #248

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions README.md

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions kotlin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@

## Use Cases and Patterns

#### Orchestration patterns
- **[Sagas](patterns-use-cases/README.md#sagas)**: Preserve consistency by tracking undo actions and running them when code fails halfway through. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](patterns-use-cases/src/main/kotlin/my/example/sagas/BookingWorkflow.kt)

#### Event processing
- **[Transactional Event Processing](patterns-use-cases/README.md#transactional-event-processing)**: Processing events (from Kafka) to update various downstream systems in a transactional way. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](patterns-use-cases/src/main/kotlin/my/example/eventtransactions/UserFeed.kt)
- **[Event Enrichment / Joins](patterns-use-cases/README.md#event-enrichment--joins)**: Stateful functions/actors connected to Kafka and callable over RPC. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](patterns-use-cases/src/main/kotlin/my/example/eventenrichment/PackageTracker.kt)

## Integrations

Examples integrating Restate with other tools and frameworks:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SubscriptionService {
}

for (subscription in req.subscriptions) {
ctx.run {
ctx.runBlock {
createSubscription(req.userId, subscription, payRef)
}
}
Expand Down
179 changes: 179 additions & 0 deletions kotlin/patterns-use-cases/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
# Kotlin Patterns and Use Cases

#### Orchestration patterns
- **[Sagas](README.md#sagas)**: Preserve consistency by tracking undo actions and running them when code fails halfway through. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/main/kotlin/my/example/sagas/BookingWorkflow.kt)

#### Event processing
- **[Transactional Event Processing](README.md#transactional-event-processing)**: Processing events (from Kafka) to update various downstream systems in a transactional way. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/main/kotlin/my/example/eventtransactions/UserFeed.kt)
- **[Event Enrichment / Joins](README.md#event-enrichment--joins)**: Stateful functions/actors connected to Kafka and callable over RPC. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/main/kotlin/my/example/eventenrichment/PackageTracker.kt)

## Sagas
[<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/show-code.svg">](src/main/kotlin/my/example/sagas/BookingWorkflow.kt)

Expand Down Expand Up @@ -73,4 +78,178 @@ dev.restate.sdk.common.TerminalException: Failed to reserve the trip: 👻 Payme
```

</details>
</details>

## Transactional Event Processing
[<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/show-code.svg">](src/main/kotlin/my/example/eventtransactions/UserFeed.kt)

Processing events (from Kafka) to update various downstream systems.
- Durable side effects with retries and recovery of partial progress
- Events get sent to objects based on the Kafka key.
For each key, Restate ensures that events are processed sequentially and in order.
Slow events on other keys do not block processing (high fan-out, no head-of-line waiting).
- Ability to delay events when the downstream systems are busy, without blocking
entire partitions.

<details>
<summary><strong>Running the example</strong></summary>

1. Start the Kafka broker via Docker Compose: `docker compose up -d`.
2. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) with the Kafka broker configuration in a separate shell: `restate-server --config-file restate.toml`
3. Start the service: `./gradlew -PmainClass=my.example.eventtransactions.UserFeedKt run`
4. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080`
5. Let Restate subscribe to the Kafka topic `social-media-posts` and invoke `UserFeed/processPost` on each message.
```shell
curl localhost:9070/subscriptions -H 'content-type: application/json' \
-d '{
"source": "kafka://my-cluster/social-media-posts",
"sink": "service://UserFeed/processPost",
"options": {"auto.offset.reset": "earliest"}
}'
```

Start a Kafka producer and send some messages to the `social-media-posts` topic:
```shell
docker exec -it broker kafka-console-producer --bootstrap-server broker:29092 --topic social-media-posts --property parse.key=true --property key.separator=:
```

Let's submit some posts for two different users:
```
userid1:{"content": "Hi! This is my first post!", "metadata": "public"}
userid2:{"content": "Hi! This is my first post!", "metadata": "public"}
userid1:{"content": "Hi! This is my second post!", "metadata": "public"}
```

Our Kafka broker only has a single partition so all these messages end up on the same partition.
You can see in the logs how events for different users are processed in parallel, but events for the same user are processed sequentially.


<details>
<summary><strong>View logs</strong></summary>

```shell
2025-02-27 15:53:37 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] dev.restate.sdk.core.InvocationStateMachine - Start invocation
2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] dev.restate.sdk.core.InvocationStateMachine - Start invocation
2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Creating post ee5b9dde-fc81-4819-a411-916e5c2b0c0d for user userid2
2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] UserFeed - Creating post ea2eb2e4-aeb1-4cee-a903-a6399f0ee6ca for user userid1
2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] UserFeed - Content moderation for post ea2eb2e4-aeb1-4cee-a903-a6399f0ee6ca is still pending... Will check again in 5 seconds
2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds
2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] UserFeed - Content moderation for post ea2eb2e4-aeb1-4cee-a903-a6399f0ee6ca is done
2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds
2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] UserFeed - Updating user feed for user userid1 with post ea2eb2e4-aeb1-4cee-a903-a6399f0ee6ca
2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] dev.restate.sdk.core.InvocationStateMachine - End invocation
2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] dev.restate.sdk.core.InvocationStateMachine - Start invocation
2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] UserFeed - Creating post 382f3687-fb11-49fa-912c-18a886dd1ecd for user userid1
2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] UserFeed - Content moderation for post 382f3687-fb11-49fa-912c-18a886dd1ecd is still pending... Will check again in 5 seconds
2025-02-27 15:53:48 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds
2025-02-27 15:54:23 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds
2025-02-27 15:54:28 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds
2025-02-27 15:54:28 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] UserFeed - Content moderation for post 382f3687-fb11-49fa-912c-18a886dd1ecd is done
2025-02-27 15:54:28 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] UserFeed - Updating user feed for user userid1 with post 382f3687-fb11-49fa-912c-18a886dd1ecd
2025-02-27 15:54:28 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] dev.restate.sdk.core.InvocationStateMachine - End invocation
2025-02-27 15:54:33 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds
2025-02-27 15:54:38 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds
2025-02-27 15:55:03 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is done
2025-02-27 15:55:03 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Updating user feed for user userid2 with post ee5b9dde-fc81-4819-a411-916e5c2b0c0d
2025-02-27 15:55:03 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] dev.restate.sdk.core.InvocationStateMachine - End invocation
```

As you see, slow events do not block other slow events.
Restate effectively created a queue per user ID.

The handler creates the social media post and waits for content moderation to finish.
If the moderation takes long, and there is an infrastructure crash, then Restate will trigger a retry.
The handler will fast-forward to where it was, will recover the post ID and will continue waiting for moderation to finish.

You can try it out by killing Restate or the service halfway through processing a post.

</details>
</details>

## Event Enrichment / Joins
[<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/show-code.svg">](src/main/kotlin/my/example/eventenrichment/PackageTracker.kt)

This example shows an example of:
- **Event enrichment** over different sources: RPC and Kafka
- **Stateful actors / Digital twins** updated over Kafka
- **Streaming join**
- Populating state from events and making it queryable via RPC handlers.

The example implements a package delivery tracking service.
Packages are registered via an RPC handler, and their location is updated via Kafka events.
The Package Tracker Virtual Object tracks the package details and its location history.

<details>
<summary><strong>Running the example</strong></summary>

1. Start the Kafka broker via Docker Compose: `docker compose up -d`.

2. Start Restate Server with the Kafka broker configuration in a separate shell: `restate-server --config-file restate.toml`

3. Start the service: `./gradlew -PmainClass=my.example.eventenrichment.PackageTrackerKt run`

4. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080`

5. Let Restate subscribe to the Kafka topic `package-location-updates` and invoke `PackageTracker/updateLocation` on each message.
```shell
curl localhost:9070/subscriptions -H 'content-type: application/json' \
-d '{
"source": "kafka://my-cluster/package-location-updates",
"sink": "service://PackageTracker/updateLocation",
"options": {"auto.offset.reset": "earliest"}
}'
```

6. Register a new package via the RPC handler:
```shell
curl localhost:8080/PackageTracker/package1/registerPackage \
-H 'content-type: application/json' -d '{"finalDestination": "Bridge 6, Amsterdam"}'
```

7. Start a Kafka producer and publish some messages to update the location of the package on the `package-location-updates` topic:
```shell
docker exec -it broker kafka-console-producer --bootstrap-server broker:29092 --topic package-location-updates --property parse.key=true --property key.separator=:
```
Send messages like
```
package1:{"timestamp": "2024-10-10 13:00", "location": "Pinetree Road 5, Paris"}
package1:{"timestamp": "2024-10-10 14:00", "location": "Mountain Road 155, Brussels"}
```

8. Query the package location via the RPC handler:
```shell
curl localhost:8080/PackageTracker/package1/getPackageInfo
```
or via the CLI: `restate kv get PackageTracker package1`

You can see how the state was enriched by the initial RPC event and the subsequent Kafka events:

<details>
<summary>See Output</summary>

```
🤖 State:
―――――――――

Service PackageTracker
Key package1

KEY VALUE
package-info {
"finalDestination": "Bridge 6, Amsterdam",
"locations": [
{
"location": "Pinetree Road 5, Paris",
"timestamp": "2024-10-10 13:00"
},
{
"location": "Mountain Road 155, Brussels",
"timestamp": "2024-10-10 14:00"
}
]
}
```

</details>

</details>
39 changes: 39 additions & 0 deletions kotlin/patterns-use-cases/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
version: '3'
services:
broker:
image: confluentinc/cp-kafka:7.5.0
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

init-kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- broker
entrypoint: [ '/bin/sh', '-c' ]
command: |
"# blocks until kafka is reachable
kafka-topics --bootstrap-server broker:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server broker:29092 --create --if-not-exists --topic social-media-posts --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server broker:29092 --create --if-not-exists --topic package-location-updates --replication-factor 1 --partitions 1

echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server broker:29092 --list"
3 changes: 3 additions & 0 deletions kotlin/patterns-use-cases/restate.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[[ingress.kafka-clusters]]
name = "my-cluster"
brokers = ["PLAINTEXT://localhost:9092"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package my.example.eventenrichment

import dev.restate.sdk.annotation.Handler
import dev.restate.sdk.annotation.Shared
import dev.restate.sdk.annotation.VirtualObject
import dev.restate.sdk.common.StateKey
import dev.restate.sdk.common.TerminalException
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder
import dev.restate.sdk.kotlin.KtSerdes
import dev.restate.sdk.kotlin.ObjectContext
import dev.restate.sdk.kotlin.SharedObjectContext
import kotlinx.serialization.Serializable

@Serializable
data class PackageInfo(val finalDestination: String, val locations: MutableList<LocationUpdate> = mutableListOf())
@Serializable
data class LocationUpdate(val timestamp: String, val location: String)

@VirtualObject
class PackageTracker {

companion object {
private val PACKAGE_INFO = StateKey.of("package-info", KtSerdes.json<PackageInfo>())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be just KtStateKey.json<PackageInfo>("package-info")

}

@Handler
suspend fun registerPackage(ctx: ObjectContext, packageInfo: PackageInfo) {
ctx.set(PACKAGE_INFO, packageInfo)
}

@Handler
suspend fun updateLocation(ctx: ObjectContext, locationUpdate: LocationUpdate) {
val packageInfo = ctx.get(PACKAGE_INFO)
?: throw TerminalException("Package not found")

packageInfo.locations.add(locationUpdate)
ctx.set(PACKAGE_INFO, packageInfo)
}

@Shared
suspend fun getPackageInfo(ctx: SharedObjectContext): PackageInfo {
return ctx.get(PACKAGE_INFO)
?: throw TerminalException("Package not found")
}
}

fun main() {
RestateHttpEndpointBuilder.builder().bind(PackageTracker()).buildAndListen()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package my.example.eventtransactions

import dev.restate.sdk.annotation.Handler
import dev.restate.sdk.annotation.VirtualObject
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder
import dev.restate.sdk.kotlin.ObjectContext
import dev.restate.sdk.kotlin.runBlock
import kotlinx.serialization.Serializable
import my.example.eventtransactions.utils.createPost
import my.example.eventtransactions.utils.getPostStatus
import my.example.eventtransactions.utils.updateUserFeed
import kotlin.time.Duration.Companion.milliseconds

@VirtualObject
class UserFeed {

@Serializable
data class SocialMediaPost(val content: String, val metadata: String)

@Handler
suspend fun processPost(ctx: ObjectContext, post: SocialMediaPost) {
val userId = ctx.key()

val postId = ctx.runBlock { createPost(userId, post) }

while (ctx.runBlock { getPostStatus(postId) } == "PENDING") {
ctx.sleep(5000.milliseconds)
}

ctx.runBlock { updateUserFeed(userId, postId) }
}
}

fun main() {
RestateHttpEndpointBuilder.builder().bind(UserFeed()).buildAndListen()
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package my.example.eventtransactions.utils

import my.example.eventtransactions.UserFeed
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import java.util.UUID

private val logger: Logger = LogManager.getLogger("UserFeed")

fun createPost(userId: String, post: UserFeed.SocialMediaPost): String {
val postId = UUID.randomUUID().toString()
logger.info("Creating post {} for user {}", postId, userId)
return postId
}

fun getPostStatus(postId: String): String {
return if (Math.random() < 0.8) {
logger.info("Content moderation for post {} is still pending... Will check again in 5 seconds", postId)
"PENDING"
} else {
logger.info("Content moderation for post {} is done", postId)
"DONE"
}
}

fun updateUserFeed(userId: String, postId: String) {
logger.info("Updating user feed for user {} with post {}", userId, postId)
}
Loading