From c93c7b097cf76057571267c4ed4fdb0938d8ad97 Mon Sep 17 00:00:00 2001 From: Giselle van Dongen Date: Thu, 27 Feb 2025 16:03:34 +0100 Subject: [PATCH 1/4] Add event processing patterns for Kotlin --- kotlin/patterns-use-cases/README.md | 174 ++++++++++++++++++ kotlin/patterns-use-cases/docker-compose.yaml | 39 ++++ kotlin/patterns-use-cases/restate.toml | 3 + .../example/eventenrichment/PackageTracker.kt | 49 +++++ .../my/example/eventtransactions/UserFeed.kt | 37 ++++ .../example/eventtransactions/utils/stubs.kt | 28 +++ 6 files changed, 330 insertions(+) create mode 100644 kotlin/patterns-use-cases/docker-compose.yaml create mode 100644 kotlin/patterns-use-cases/restate.toml create mode 100644 kotlin/patterns-use-cases/src/main/kotlin/my/example/eventenrichment/PackageTracker.kt create mode 100644 kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/UserFeed.kt create mode 100644 kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/utils/stubs.kt diff --git a/kotlin/patterns-use-cases/README.md b/kotlin/patterns-use-cases/README.md index 3c905378..850774cd 100644 --- a/kotlin/patterns-use-cases/README.md +++ b/kotlin/patterns-use-cases/README.md @@ -73,4 +73,178 @@ dev.restate.sdk.common.TerminalException: Failed to reserve the trip: 👻 Payme ``` + + +## Transactional Event Processing +[](src/main/kotlin/my/example/eventtransactions/UserFeed.kt) + +Processing events (from Kafka) to update various downstream systems. +- Durable side effects with retries and recovery of partial progress +- Events get sent to objects based on the Kafka key. + For each key, Restate ensures that events are processed sequentially and in order. + Slow events on other keys do not block processing (high fan-out, no head-of-line waiting). +- Ability to delay events when the downstream systems are busy, without blocking + entire partitions. + +
+Running the example + +1. Start the Kafka broker via Docker Compose: `docker compose up -d`. +2. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) with the Kafka broker configuration in a separate shell: `restate-server --config-file restate.toml` +3. Start the service: `./gradlew -PmainClass=my.example.eventtransactions.UserFeedKt run` +4. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080` +5. Let Restate subscribe to the Kafka topic `social-media-posts` and invoke `UserFeed/processPost` on each message. + ```shell + curl localhost:9070/subscriptions -H 'content-type: application/json' \ + -d '{ + "source": "kafka://my-cluster/social-media-posts", + "sink": "service://UserFeed/processPost", + "options": {"auto.offset.reset": "earliest"} + }' + ``` + +Start a Kafka producer and send some messages to the `social-media-posts` topic: +```shell +docker exec -it broker kafka-console-producer --bootstrap-server broker:29092 --topic social-media-posts --property parse.key=true --property key.separator=: +``` + +Let's submit some posts for two different users: +``` +userid1:{"content": "Hi! This is my first post!", "metadata": "public"} +userid2:{"content": "Hi! This is my first post!", "metadata": "public"} +userid1:{"content": "Hi! This is my second post!", "metadata": "public"} +``` + +Our Kafka broker only has a single partition so all these messages end up on the same partition. +You can see in the logs how events for different users are processed in parallel, but events for the same user are processed sequentially. + + +
+View logs + +```shell +2025-02-27 15:53:37 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Creating post ee5b9dde-fc81-4819-a411-916e5c2b0c0d for user userid2 +2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] UserFeed - Creating post ea2eb2e4-aeb1-4cee-a903-a6399f0ee6ca for user userid1 +2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] UserFeed - Content moderation for post ea2eb2e4-aeb1-4cee-a903-a6399f0ee6ca is still pending... Will check again in 5 seconds +2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] UserFeed - Content moderation for post ea2eb2e4-aeb1-4cee-a903-a6399f0ee6ca is done +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] UserFeed - Updating user feed for user userid1 with post ea2eb2e4-aeb1-4cee-a903-a6399f0ee6ca +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] dev.restate.sdk.core.InvocationStateMachine - End invocation +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] UserFeed - Creating post 382f3687-fb11-49fa-912c-18a886dd1ecd for user userid1 +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] UserFeed - Content moderation for post 382f3687-fb11-49fa-912c-18a886dd1ecd is still pending... Will check again in 5 seconds +2025-02-27 15:53:48 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:54:23 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:54:28 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:54:28 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] UserFeed - Content moderation for post 382f3687-fb11-49fa-912c-18a886dd1ecd is done +2025-02-27 15:54:28 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] UserFeed - Updating user feed for user userid1 with post 382f3687-fb11-49fa-912c-18a886dd1ecd +2025-02-27 15:54:28 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] dev.restate.sdk.core.InvocationStateMachine - End invocation +2025-02-27 15:54:33 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:54:38 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:55:03 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is done +2025-02-27 15:55:03 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Updating user feed for user userid2 with post ee5b9dde-fc81-4819-a411-916e5c2b0c0d +2025-02-27 15:55:03 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] dev.restate.sdk.core.InvocationStateMachine - End invocation +``` + +As you see, slow events do not block other slow events. +Restate effectively created a queue per user ID. + +The handler creates the social media post and waits for content moderation to finish. +If the moderation takes long, and there is an infrastructure crash, then Restate will trigger a retry. +The handler will fast-forward to where it was, will recover the post ID and will continue waiting for moderation to finish. + +You can try it out by killing Restate or the service halfway through processing a post. + +
+
+ +## Event Enrichment / Joins +[](src/main/kotlin/my/example/eventenrichment/PackageTracker.kt) + +This example shows an example of: +- **Event enrichment** over different sources: RPC and Kafka +- **Stateful actors / Digital twins** updated over Kafka +- **Streaming join** +- Populating state from events and making it queryable via RPC handlers. + +The example implements a package delivery tracking service. +Packages are registered via an RPC handler, and their location is updated via Kafka events. +The Package Tracker Virtual Object tracks the package details and its location history. + +
+Running the example + +1. Start the Kafka broker via Docker Compose: `docker compose up -d`. + +2. Start Restate Server with the Kafka broker configuration in a separate shell: `restate-server --config-file restate.toml` + +3. Start the service: `./gradlew -PmainClass=my.example.eventenrichment.PackageTrackerKt run` + +4. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080` + +5. Let Restate subscribe to the Kafka topic `package-location-updates` and invoke `PackageTracker/updateLocation` on each message. + ```shell + curl localhost:9070/subscriptions -H 'content-type: application/json' \ + -d '{ + "source": "kafka://my-cluster/package-location-updates", + "sink": "service://PackageTracker/updateLocation", + "options": {"auto.offset.reset": "earliest"} + }' + ``` + +6. Register a new package via the RPC handler: + ```shell + curl localhost:8080/PackageTracker/package1/registerPackage \ + -H 'content-type: application/json' -d '{"finalDestination": "Bridge 6, Amsterdam"}' + ``` + +7. Start a Kafka producer and publish some messages to update the location of the package on the `package-location-updates` topic: + ```shell + docker exec -it broker kafka-console-producer --bootstrap-server broker:29092 --topic package-location-updates --property parse.key=true --property key.separator=: + ``` + Send messages like + ``` + package1:{"timestamp": "2024-10-10 13:00", "location": "Pinetree Road 5, Paris"} + package1:{"timestamp": "2024-10-10 14:00", "location": "Mountain Road 155, Brussels"} + ``` + +8. Query the package location via the RPC handler: + ```shell + curl localhost:8080/PackageTracker/package1/getPackageInfo + ``` + or via the CLI: `restate kv get PackageTracker package1` + + You can see how the state was enriched by the initial RPC event and the subsequent Kafka events: + +
+ See Output + + ``` + 🤖 State: + ――――――――― + + Service PackageTracker + Key package1 + + KEY VALUE + package-info { + "finalDestination": "Bridge 6, Amsterdam", + "locations": [ + { + "location": "Pinetree Road 5, Paris", + "timestamp": "2024-10-10 13:00" + }, + { + "location": "Mountain Road 155, Brussels", + "timestamp": "2024-10-10 14:00" + } + ] + } + ``` + +
+
\ No newline at end of file diff --git a/kotlin/patterns-use-cases/docker-compose.yaml b/kotlin/patterns-use-cases/docker-compose.yaml new file mode 100644 index 00000000..2e33e605 --- /dev/null +++ b/kotlin/patterns-use-cases/docker-compose.yaml @@ -0,0 +1,39 @@ +version: '3' +services: + broker: + image: confluentinc/cp-kafka:7.5.0 + container_name: broker + ports: + - "9092:9092" + - "9101:9101" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_NODE_ID: 1 + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093 + KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LOG_DIRS: /tmp/kraft-combined-logs + CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk + + init-kafka: + image: confluentinc/cp-kafka:7.5.0 + depends_on: + - broker + entrypoint: [ '/bin/sh', '-c' ] + command: | + "# blocks until kafka is reachable + kafka-topics --bootstrap-server broker:29092 --list + echo -e 'Creating kafka topics' + kafka-topics --bootstrap-server broker:29092 --create --if-not-exists --topic social-media-posts --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server broker:29092 --create --if-not-exists --topic package-location-updates --replication-factor 1 --partitions 1 + + echo -e 'Successfully created the following topics:' + kafka-topics --bootstrap-server broker:29092 --list" \ No newline at end of file diff --git a/kotlin/patterns-use-cases/restate.toml b/kotlin/patterns-use-cases/restate.toml new file mode 100644 index 00000000..8a0bde1c --- /dev/null +++ b/kotlin/patterns-use-cases/restate.toml @@ -0,0 +1,3 @@ +[[ingress.kafka-clusters]] +name = "my-cluster" +brokers = ["PLAINTEXT://localhost:9092"] \ No newline at end of file diff --git a/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventenrichment/PackageTracker.kt b/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventenrichment/PackageTracker.kt new file mode 100644 index 00000000..bf69c5e8 --- /dev/null +++ b/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventenrichment/PackageTracker.kt @@ -0,0 +1,49 @@ +package my.example.eventenrichment + +import dev.restate.sdk.annotation.Handler +import dev.restate.sdk.annotation.Shared +import dev.restate.sdk.annotation.VirtualObject +import dev.restate.sdk.common.StateKey +import dev.restate.sdk.common.TerminalException +import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder +import dev.restate.sdk.kotlin.KtSerdes +import dev.restate.sdk.kotlin.ObjectContext +import dev.restate.sdk.kotlin.SharedObjectContext +import kotlinx.serialization.Serializable + +@Serializable +data class PackageInfo(val finalDestination: String, val locations: MutableList = mutableListOf()) +@Serializable +data class LocationUpdate(val timestamp: String, val location: String) + +@VirtualObject +class PackageTracker { + + companion object { + private val PACKAGE_INFO = StateKey.of("package-info", KtSerdes.json()) + } + + @Handler + suspend fun registerPackage(ctx: ObjectContext, packageInfo: PackageInfo) { + ctx.set(PACKAGE_INFO, packageInfo) + } + + @Handler + suspend fun updateLocation(ctx: ObjectContext, locationUpdate: LocationUpdate) { + val packageInfo = ctx.get(PACKAGE_INFO) + ?: throw TerminalException("Package not found") + + packageInfo.locations.add(locationUpdate) + ctx.set(PACKAGE_INFO, packageInfo) + } + + @Shared + suspend fun getPackageInfo(ctx: SharedObjectContext): PackageInfo { + return ctx.get(PACKAGE_INFO) + ?: throw TerminalException("Package not found") + } +} + +fun main() { + RestateHttpEndpointBuilder.builder().bind(PackageTracker()).buildAndListen() +} diff --git a/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/UserFeed.kt b/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/UserFeed.kt new file mode 100644 index 00000000..7dd04515 --- /dev/null +++ b/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/UserFeed.kt @@ -0,0 +1,37 @@ +package my.example.eventtransactions + +import dev.restate.sdk.annotation.Handler +import dev.restate.sdk.annotation.VirtualObject +import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder +import dev.restate.sdk.kotlin.ObjectContext +import dev.restate.sdk.kotlin.runBlock +import kotlinx.serialization.Serializable +import my.example.eventtransactions.utils.createPost +import my.example.eventtransactions.utils.getPostStatus +import my.example.eventtransactions.utils.updateUserFeed +import kotlin.time.Duration.Companion.milliseconds + +@VirtualObject +class UserFeed { + + @Serializable + data class SocialMediaPost(val content: String, val metadata: String) + + @Handler + suspend fun processPost(ctx: ObjectContext, post: SocialMediaPost) { + val userId = ctx.key() + + val postId = ctx.runBlock { createPost(userId, post) } + + while (ctx.runBlock { getPostStatus(postId) } == "PENDING") { + ctx.sleep(5000.milliseconds) + } + + ctx.runBlock { updateUserFeed(userId, postId) } + } +} + +fun main() { + RestateHttpEndpointBuilder.builder().bind(UserFeed()).buildAndListen() +} + diff --git a/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/utils/stubs.kt b/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/utils/stubs.kt new file mode 100644 index 00000000..e940cbb1 --- /dev/null +++ b/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/utils/stubs.kt @@ -0,0 +1,28 @@ +package my.example.eventtransactions.utils + +import my.example.eventtransactions.UserFeed +import org.apache.logging.log4j.LogManager +import org.apache.logging.log4j.Logger +import java.util.UUID + +private val logger: Logger = LogManager.getLogger("UserFeed") + +fun createPost(userId: String, post: UserFeed.SocialMediaPost): String { + val postId = UUID.randomUUID().toString() + logger.info("Creating post {} for user {}", postId, userId) + return postId +} + +fun getPostStatus(postId: String): String { + return if (Math.random() < 0.8) { + logger.info("Content moderation for post {} is still pending... Will check again in 5 seconds", postId) + "PENDING" + } else { + logger.info("Content moderation for post {} is done", postId) + "DONE" + } +} + +fun updateUserFeed(userId: String, postId: String) { + logger.info("Updating user feed for user {} with post {}", userId, postId) +} \ No newline at end of file From b57fefc82f797db2c0224a82d1f39f9c1011419b Mon Sep 17 00:00:00 2001 From: Giselle van Dongen Date: Thu, 27 Feb 2025 16:11:50 +0100 Subject: [PATCH 2/4] Add Kotlin event processing examples to the readmes --- kotlin/patterns-use-cases/README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kotlin/patterns-use-cases/README.md b/kotlin/patterns-use-cases/README.md index 850774cd..48a9eb6c 100644 --- a/kotlin/patterns-use-cases/README.md +++ b/kotlin/patterns-use-cases/README.md @@ -1,7 +1,12 @@ # Kotlin Patterns and Use Cases +#### Orchestration patterns - **[Sagas](README.md#sagas)**: Preserve consistency by tracking undo actions and running them when code fails halfway through. [](src/main/kotlin/my/example/sagas/BookingWorkflow.kt) +#### Event processing +- **[Transactional Event Processing](README.md#transactional-event-processing)**: Processing events (from Kafka) to update various downstream systems in a transactional way. [](src/main/kotlin/my/example/eventtransactions/UserFeed.kt) +- **[Event Enrichment / Joins](README.md#event-enrichment--joins)**: Stateful functions/actors connected to Kafka and callable over RPC. [](src/main/kotlin/my/example/eventenrichment/PackageTracker.kt) + ## Sagas [](src/main/kotlin/my/example/sagas/BookingWorkflow.kt) From 2a3d67e1918166a7a46a7d6dad6d1f0eecb736f3 Mon Sep 17 00:00:00 2001 From: Giselle van Dongen Date: Thu, 27 Feb 2025 16:12:37 +0100 Subject: [PATCH 3/4] Add Kotlin event processing examples to the readmes --- README.md | 34 +++++++++++++++++----------------- kotlin/README.md | 5 +++++ 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 59c6e14a..d604f4d3 100644 --- a/README.md +++ b/README.md @@ -52,23 +52,23 @@ Or have a look at the general catalog below: #### Use Cases and Patterns -| Example Name | Languages | -|---------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| Durable RPC, Idempotency & Concurrency | [](typescript/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](go/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](python/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](java/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) | -| \(Delayed\) Message Queue | [](typescript/patterns-use-cases/README.md#delayed-message-queue) [](go/patterns-use-cases/README.md#delayed-message-queue) [](python/patterns-use-cases/README.md#delayed-message-queue) [](java/patterns-use-cases/README.md#delayed-message-queue) | -| Webhook Callbacks | [](typescript/patterns-use-cases/README.md#webhook-callbacks) [](go/patterns-use-cases/README.md#webhook-callbacks) | -| Database Interaction Patterns | [](typescript/patterns-use-cases/README.md#database-interaction-patterns) | -| Convert Sync Tasks to Async | [](typescript/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](go/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](python/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](java/patterns-use-cases/README.md#convert-sync-tasks-to-async) | -| Payments Signals \(Advanced\) | [](typescript/patterns-use-cases/README.md#payment-signals) [](python/patterns-use-cases/README.md#payment-signals) [](java/patterns-use-cases/README.md#payment-signals) | -| Sagas | [](typescript/patterns-use-cases/README.md#sagas) [](go/patterns-use-cases/README.md#sagas) [](python/patterns-use-cases/README.md#sagas) [](java/patterns-use-cases/README.md#sagas) [](kotlin/patterns-use-cases/README.md#sagas) | -| Stateful Actors and State Machines | [](typescript/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](go/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](python/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](java/patterns-use-cases/README.md#stateful-actors-and-state-machines) | -| Payment State Machines \(Advanced\) | [](typescript/patterns-use-cases/README.md#payment-state-machines) [](python/patterns-use-cases/README.md#payment-state-machines) [](java/patterns-use-cases/README.md#payment-state-machines) | -| Scheduling Tasks | [](typescript/patterns-use-cases/README.md#scheduling-tasks) [](go/patterns-use-cases/README.md#scheduling-tasks) | -| Parallelizing Work | [](typescript/patterns-use-cases/README.md#parallelizing-work) [](go/patterns-use-cases/README.md#parallelizing-work) [](python/patterns-use-cases/README.md#parallelizing-work) [](java/patterns-use-cases/README.md#parallelizing-work) | -| Transactional Event Processing | [](typescript/patterns-use-cases/README.md#transactional-event-processing) [](go/patterns-use-cases/README.md#transactional-event-processing) [](python/patterns-use-cases/README.md#transactional-event-processing) [](java/patterns-use-cases/README.md#transactional-event-processing) | -| Event Enrichment / Joins | [](typescript/patterns-use-cases/README.md#event-enrichment--joins) [](go/patterns-use-cases/README.md#event-enrichment--joins) [](python/patterns-use-cases/README.md#event-enrichment--joins) [](java/patterns-use-cases/README.md#event-enrichment--joins) | -| Durable Promises as a Service | [](typescript/patterns-use-cases/README.md#durable-promises-as-a-service) | -| Priority Queue | [](typescript/patterns-use-cases/README.md#priority-queue) | +| Example Name | Languages | +|---------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Durable RPC, Idempotency & Concurrency | [](typescript/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](go/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](python/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](java/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) | +| \(Delayed\) Message Queue | [](typescript/patterns-use-cases/README.md#delayed-message-queue) [](go/patterns-use-cases/README.md#delayed-message-queue) [](python/patterns-use-cases/README.md#delayed-message-queue) [](java/patterns-use-cases/README.md#delayed-message-queue) | +| Webhook Callbacks | [](typescript/patterns-use-cases/README.md#webhook-callbacks) [](go/patterns-use-cases/README.md#webhook-callbacks) | +| Database Interaction Patterns | [](typescript/patterns-use-cases/README.md#database-interaction-patterns) | +| Convert Sync Tasks to Async | [](typescript/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](go/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](python/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](java/patterns-use-cases/README.md#convert-sync-tasks-to-async) | +| Payments Signals \(Advanced\) | [](typescript/patterns-use-cases/README.md#payment-signals) [](python/patterns-use-cases/README.md#payment-signals) [](java/patterns-use-cases/README.md#payment-signals) | +| Sagas | [](typescript/patterns-use-cases/README.md#sagas) [](go/patterns-use-cases/README.md#sagas) [](python/patterns-use-cases/README.md#sagas) [](java/patterns-use-cases/README.md#sagas) [](kotlin/patterns-use-cases/README.md#sagas) | +| Stateful Actors and State Machines | [](typescript/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](go/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](python/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](java/patterns-use-cases/README.md#stateful-actors-and-state-machines) | +| Payment State Machines \(Advanced\) | [](typescript/patterns-use-cases/README.md#payment-state-machines) [](python/patterns-use-cases/README.md#payment-state-machines) [](java/patterns-use-cases/README.md#payment-state-machines) | +| Scheduling Tasks | [](typescript/patterns-use-cases/README.md#scheduling-tasks) [](go/patterns-use-cases/README.md#scheduling-tasks) | +| Parallelizing Work | [](typescript/patterns-use-cases/README.md#parallelizing-work) [](go/patterns-use-cases/README.md#parallelizing-work) [](python/patterns-use-cases/README.md#parallelizing-work) [](java/patterns-use-cases/README.md#parallelizing-work) | +| Transactional Event Processing | [](typescript/patterns-use-cases/README.md#transactional-event-processing) [](go/patterns-use-cases/README.md#transactional-event-processing) [](python/patterns-use-cases/README.md#transactional-event-processing) [](java/patterns-use-cases/README.md#transactional-event-processing) [](kotlin/patterns-use-cases/README.md#transactional-event-processing) | +| Event Enrichment / Joins | [](typescript/patterns-use-cases/README.md#event-enrichment--joins) [](go/patterns-use-cases/README.md#event-enrichment--joins) [](python/patterns-use-cases/README.md#event-enrichment--joins) [](java/patterns-use-cases/README.md#event-enrichment--joins) [](kotlin/patterns-use-cases/README.md#event-enrichment--joins) | +| Durable Promises as a Service | [](typescript/patterns-use-cases/README.md#durable-promises-as-a-service) | +| Priority Queue | [](typescript/patterns-use-cases/README.md#priority-queue) | #### Integrations diff --git a/kotlin/README.md b/kotlin/README.md index fb65446f..7c7ec5d8 100644 --- a/kotlin/README.md +++ b/kotlin/README.md @@ -9,8 +9,13 @@ ## Use Cases and Patterns +#### Orchestration patterns - **[Sagas](patterns-use-cases/README.md#sagas)**: Preserve consistency by tracking undo actions and running them when code fails halfway through. [](patterns-use-cases/src/main/kotlin/my/example/sagas/BookingWorkflow.kt) +#### Event processing +- **[Transactional Event Processing](patterns-use-cases/README.md#transactional-event-processing)**: Processing events (from Kafka) to update various downstream systems in a transactional way. [](patterns-use-cases/src/main/kotlin/my/example/eventtransactions/UserFeed.kt) +- **[Event Enrichment / Joins](patterns-use-cases/README.md#event-enrichment--joins)**: Stateful functions/actors connected to Kafka and callable over RPC. [](patterns-use-cases/src/main/kotlin/my/example/eventenrichment/PackageTracker.kt) + ## Integrations Examples integrating Restate with other tools and frameworks: From c819360e8fb7e30c67b783ac9c670a3810dd1a41 Mon Sep 17 00:00:00 2001 From: Giselle van Dongen Date: Thu, 27 Feb 2025 16:14:08 +0100 Subject: [PATCH 4/4] Fix kotlin side effect run to runBlock --- .../src/main/kotlin/durable_execution/SubscriptionService.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlin/basics/src/main/kotlin/durable_execution/SubscriptionService.kt b/kotlin/basics/src/main/kotlin/durable_execution/SubscriptionService.kt index aa395783..8d9c2ed1 100644 --- a/kotlin/basics/src/main/kotlin/durable_execution/SubscriptionService.kt +++ b/kotlin/basics/src/main/kotlin/durable_execution/SubscriptionService.kt @@ -45,7 +45,7 @@ class SubscriptionService { } for (subscription in req.subscriptions) { - ctx.run { + ctx.runBlock { createSubscription(req.userId, subscription, payRef) } }