diff --git a/README.md b/README.md index 59c6e14a..24267560 100644 --- a/README.md +++ b/README.md @@ -52,23 +52,23 @@ Or have a look at the general catalog below: #### Use Cases and Patterns -| Example Name | Languages | -|---------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| Durable RPC, Idempotency & Concurrency | [](typescript/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](go/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](python/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](java/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) | -| \(Delayed\) Message Queue | [](typescript/patterns-use-cases/README.md#delayed-message-queue) [](go/patterns-use-cases/README.md#delayed-message-queue) [](python/patterns-use-cases/README.md#delayed-message-queue) [](java/patterns-use-cases/README.md#delayed-message-queue) | -| Webhook Callbacks | [](typescript/patterns-use-cases/README.md#webhook-callbacks) [](go/patterns-use-cases/README.md#webhook-callbacks) | -| Database Interaction Patterns | [](typescript/patterns-use-cases/README.md#database-interaction-patterns) | -| Convert Sync Tasks to Async | [](typescript/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](go/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](python/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](java/patterns-use-cases/README.md#convert-sync-tasks-to-async) | -| Payments Signals \(Advanced\) | [](typescript/patterns-use-cases/README.md#payment-signals) [](python/patterns-use-cases/README.md#payment-signals) [](java/patterns-use-cases/README.md#payment-signals) | -| Sagas | [](typescript/patterns-use-cases/README.md#sagas) [](go/patterns-use-cases/README.md#sagas) [](python/patterns-use-cases/README.md#sagas) [](java/patterns-use-cases/README.md#sagas) [](kotlin/patterns-use-cases/README.md#sagas) | -| Stateful Actors and State Machines | [](typescript/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](go/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](python/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](java/patterns-use-cases/README.md#stateful-actors-and-state-machines) | -| Payment State Machines \(Advanced\) | [](typescript/patterns-use-cases/README.md#payment-state-machines) [](python/patterns-use-cases/README.md#payment-state-machines) [](java/patterns-use-cases/README.md#payment-state-machines) | -| Scheduling Tasks | [](typescript/patterns-use-cases/README.md#scheduling-tasks) [](go/patterns-use-cases/README.md#scheduling-tasks) | -| Parallelizing Work | [](typescript/patterns-use-cases/README.md#parallelizing-work) [](go/patterns-use-cases/README.md#parallelizing-work) [](python/patterns-use-cases/README.md#parallelizing-work) [](java/patterns-use-cases/README.md#parallelizing-work) | -| Transactional Event Processing | [](typescript/patterns-use-cases/README.md#transactional-event-processing) [](go/patterns-use-cases/README.md#transactional-event-processing) [](python/patterns-use-cases/README.md#transactional-event-processing) [](java/patterns-use-cases/README.md#transactional-event-processing) | -| Event Enrichment / Joins | [](typescript/patterns-use-cases/README.md#event-enrichment--joins) [](go/patterns-use-cases/README.md#event-enrichment--joins) [](python/patterns-use-cases/README.md#event-enrichment--joins) [](java/patterns-use-cases/README.md#event-enrichment--joins) | -| Durable Promises as a Service | [](typescript/patterns-use-cases/README.md#durable-promises-as-a-service) | -| Priority Queue | [](typescript/patterns-use-cases/README.md#priority-queue) | +| Example Name | Languages | +|---------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Durable RPC, Idempotency & Concurrency | [](typescript/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](go/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](python/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) [](java/patterns-use-cases/README.md#durable-rpc-idempotency--concurrency) | +| \(Delayed\) Message Queue | [](typescript/patterns-use-cases/README.md#delayed-message-queue) [](go/patterns-use-cases/README.md#delayed-message-queue) [](python/patterns-use-cases/README.md#delayed-message-queue) [](java/patterns-use-cases/README.md#delayed-message-queue) [](kotlin/patterns-use-cases/README.md#delayed-message-queue) | +| Webhook Callbacks | [](typescript/patterns-use-cases/README.md#webhook-callbacks) [](go/patterns-use-cases/README.md#webhook-callbacks) | +| Database Interaction Patterns | [](typescript/patterns-use-cases/README.md#database-interaction-patterns) | +| Convert Sync Tasks to Async | [](typescript/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](go/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](python/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](java/patterns-use-cases/README.md#convert-sync-tasks-to-async) | +| Payments Signals \(Advanced\) | [](typescript/patterns-use-cases/README.md#payment-signals) [](python/patterns-use-cases/README.md#payment-signals) [](java/patterns-use-cases/README.md#payment-signals) | +| Sagas | [](typescript/patterns-use-cases/README.md#sagas) [](go/patterns-use-cases/README.md#sagas) [](python/patterns-use-cases/README.md#sagas) [](java/patterns-use-cases/README.md#sagas) [](kotlin/patterns-use-cases/README.md#sagas) | +| Stateful Actors and State Machines | [](typescript/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](go/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](python/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](java/patterns-use-cases/README.md#stateful-actors-and-state-machines) | +| Payment State Machines \(Advanced\) | [](typescript/patterns-use-cases/README.md#payment-state-machines) [](python/patterns-use-cases/README.md#payment-state-machines) [](java/patterns-use-cases/README.md#payment-state-machines) | +| Scheduling Tasks | [](typescript/patterns-use-cases/README.md#scheduling-tasks) [](go/patterns-use-cases/README.md#scheduling-tasks) | +| Parallelizing Work | [](typescript/patterns-use-cases/README.md#parallelizing-work) [](go/patterns-use-cases/README.md#parallelizing-work) [](python/patterns-use-cases/README.md#parallelizing-work) [](java/patterns-use-cases/README.md#parallelizing-work) | +| Transactional Event Processing | [](typescript/patterns-use-cases/README.md#transactional-event-processing) [](go/patterns-use-cases/README.md#transactional-event-processing) [](python/patterns-use-cases/README.md#transactional-event-processing) [](java/patterns-use-cases/README.md#transactional-event-processing) [](kotlin/patterns-use-cases/README.md#transactional-event-processing) | +| Event Enrichment / Joins | [](typescript/patterns-use-cases/README.md#event-enrichment--joins) [](go/patterns-use-cases/README.md#event-enrichment--joins) [](python/patterns-use-cases/README.md#event-enrichment--joins) [](java/patterns-use-cases/README.md#event-enrichment--joins) [](kotlin/patterns-use-cases/README.md#event-enrichment--joins) | +| Durable Promises as a Service | [](typescript/patterns-use-cases/README.md#durable-promises-as-a-service) | +| Priority Queue | [](typescript/patterns-use-cases/README.md#priority-queue) | #### Integrations diff --git a/go/README.md b/go/README.md index 848fe43c..cc0c69f4 100644 --- a/go/README.md +++ b/go/README.md @@ -1,5 +1,8 @@ # Go Example Catalog +## Prerequisites +- Go >= 1.21.0 + ## Basics - **[Services - Durable Execution](basics)**: Making code resilient to failures via automatic retries and recovery of previously finished actions. [](basics/part0/durableexecution.go) diff --git a/java/README.md b/java/README.md index 193d13c9..849c26b8 100644 --- a/java/README.md +++ b/java/README.md @@ -1,5 +1,8 @@ # Java Example Catalog +## Prerequisites +- JDK >= 17 + ## Basics - **[Services - Durable Execution](basics)**: Making code resilient to failures via automatic retries and recovery of previously finished actions. [](basics/src/main/java/durable_execution/SubscriptionService.java) diff --git a/java/basics/src/main/java/workflows/SignupWorkflow.java b/java/basics/src/main/java/workflows/SignupWorkflow.java index c7239e79..ba0442e5 100644 --- a/java/basics/src/main/java/workflows/SignupWorkflow.java +++ b/java/basics/src/main/java/workflows/SignupWorkflow.java @@ -39,8 +39,6 @@ public class SignupWorkflow { // References to K/V state and promises stored in Restate private static final DurablePromiseKey EMAIL_CLICKED = DurablePromiseKey.of("email_clicked", JsonSerdes.STRING); - private static final StateKey ONBOARDING_STATUS = - StateKey.of("status", JsonSerdes.STRING); // --- The workflow logic --- @Workflow diff --git a/kotlin/README.md b/kotlin/README.md index fb65446f..92121a31 100644 --- a/kotlin/README.md +++ b/kotlin/README.md @@ -1,5 +1,8 @@ # Kotlin Example Catalog +## Prerequisites +- JDK >= 17 + ## Basics - **[Services - Durable Execution](basics)**: Making code resilient to failures via automatic retries and recovery of previously finished actions. [](basics/src/main/kotlin/durable_execution/SubscriptionService.kt) @@ -9,8 +12,16 @@ ## Use Cases and Patterns +#### Communication +- **[(Delayed) Message Queue](patterns-use-cases/README.md#delayed-message-queue)**: Use Restate as a queue. Schedule tasks for now or later and ensure the task is only executed once. [](patterns-use-cases/src/main/kotlin/my/example/queue/TaskSubmitter.kt) + +#### Orchestration patterns - **[Sagas](patterns-use-cases/README.md#sagas)**: Preserve consistency by tracking undo actions and running them when code fails halfway through. [](patterns-use-cases/src/main/kotlin/my/example/sagas/BookingWorkflow.kt) +#### Event processing +- **[Transactional Event Processing](patterns-use-cases/README.md#transactional-event-processing)**: Processing events (from Kafka) to update various downstream systems in a transactional way. [](patterns-use-cases/src/main/kotlin/my/example/eventtransactions/UserFeed.kt) +- **[Event Enrichment / Joins](patterns-use-cases/README.md#event-enrichment--joins)**: Stateful functions/actors connected to Kafka and callable over RPC. [](patterns-use-cases/src/main/kotlin/my/example/eventenrichment/PackageTracker.kt) + ## Integrations Examples integrating Restate with other tools and frameworks: diff --git a/kotlin/basics/src/main/kotlin/durable_execution/SubscriptionService.kt b/kotlin/basics/src/main/kotlin/durable_execution/SubscriptionService.kt index aa395783..8d9c2ed1 100644 --- a/kotlin/basics/src/main/kotlin/durable_execution/SubscriptionService.kt +++ b/kotlin/basics/src/main/kotlin/durable_execution/SubscriptionService.kt @@ -45,7 +45,7 @@ class SubscriptionService { } for (subscription in req.subscriptions) { - ctx.run { + ctx.runBlock { createSubscription(req.userId, subscription, payRef) } } diff --git a/kotlin/patterns-use-cases/README.md b/kotlin/patterns-use-cases/README.md index 3c905378..adc7e226 100644 --- a/kotlin/patterns-use-cases/README.md +++ b/kotlin/patterns-use-cases/README.md @@ -1,7 +1,26 @@ # Kotlin Patterns and Use Cases +### Communication +- **[(Delayed) Message Queue](README.md#delayed-message-queue)**: Use Restate as a queue. Schedule tasks for now or later and ensure the task is only executed once. [](src/main/kotlin/my/example/queue/TaskSubmitter.kt) + +#### Orchestration patterns - **[Sagas](README.md#sagas)**: Preserve consistency by tracking undo actions and running them when code fails halfway through. [](src/main/kotlin/my/example/sagas/BookingWorkflow.kt) +#### Event processing +- **[Transactional Event Processing](README.md#transactional-event-processing)**: Processing events (from Kafka) to update various downstream systems in a transactional way. [](src/main/kotlin/my/example/eventtransactions/UserFeed.kt) +- **[Event Enrichment / Joins](README.md#event-enrichment--joins)**: Stateful functions/actors connected to Kafka and callable over RPC. [](src/main/kotlin/my/example/eventenrichment/PackageTracker.kt) + +## (Delayed) Message Queue +[](src/main/kotlin/my/example/queue/TaskSubmitter.kt) + +Use Restate as a queue. Schedule tasks for now or later and ensure the task is only executed once. + +- [Task Submitter](src/main/kotlin/my/example/queue/TaskSubmitter.kt): schedules tasks via send requests with and idempotency key. + - The **send requests** put the tasks in Restate's queue. The task submitter does not wait for the task response. + - The **idempotency key** in the header is used by Restate to deduplicate requests. + - If a delay is set, the task will be executed later and Restate will track the timer durably, like a **delayed task queue**. +- [Async Task Worker](src/main/kotlin/my/example/queue/AsyncTaskService.kt): gets invoked by Restate for each task in the queue. + ## Sagas [](src/main/kotlin/my/example/sagas/BookingWorkflow.kt) @@ -73,4 +92,178 @@ dev.restate.sdk.common.TerminalException: Failed to reserve the trip: 👻 Payme ``` + + +## Transactional Event Processing +[](src/main/kotlin/my/example/eventtransactions/UserFeed.kt) + +Processing events (from Kafka) to update various downstream systems. +- Durable side effects with retries and recovery of partial progress +- Events get sent to objects based on the Kafka key. + For each key, Restate ensures that events are processed sequentially and in order. + Slow events on other keys do not block processing (high fan-out, no head-of-line waiting). +- Ability to delay events when the downstream systems are busy, without blocking + entire partitions. + +
+Running the example + +1. Start the Kafka broker via Docker Compose: `docker compose up -d`. +2. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) with the Kafka broker configuration in a separate shell: `restate-server --config-file restate.toml` +3. Start the service: `./gradlew -PmainClass=my.example.eventtransactions.UserFeedKt run` +4. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080` +5. Let Restate subscribe to the Kafka topic `social-media-posts` and invoke `UserFeed/processPost` on each message. + ```shell + curl localhost:9070/subscriptions -H 'content-type: application/json' \ + -d '{ + "source": "kafka://my-cluster/social-media-posts", + "sink": "service://UserFeed/processPost", + "options": {"auto.offset.reset": "earliest"} + }' + ``` + +Start a Kafka producer and send some messages to the `social-media-posts` topic: +```shell +docker exec -it broker kafka-console-producer --bootstrap-server broker:29092 --topic social-media-posts --property parse.key=true --property key.separator=: +``` + +Let's submit some posts for two different users: +``` +userid1:{"content": "Hi! This is my first post!", "metadata": "public"} +userid2:{"content": "Hi! This is my first post!", "metadata": "public"} +userid1:{"content": "Hi! This is my second post!", "metadata": "public"} +``` + +Our Kafka broker only has a single partition so all these messages end up on the same partition. +You can see in the logs how events for different users are processed in parallel, but events for the same user are processed sequentially. + + +
+View logs + +```shell +2025-02-27 15:53:37 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Creating post ee5b9dde-fc81-4819-a411-916e5c2b0c0d for user userid2 +2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] UserFeed - Creating post ea2eb2e4-aeb1-4cee-a903-a6399f0ee6ca for user userid1 +2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] UserFeed - Content moderation for post ea2eb2e4-aeb1-4cee-a903-a6399f0ee6ca is still pending... Will check again in 5 seconds +2025-02-27 15:53:38 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] UserFeed - Content moderation for post ea2eb2e4-aeb1-4cee-a903-a6399f0ee6ca is done +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] UserFeed - Updating user feed for user userid1 with post ea2eb2e4-aeb1-4cee-a903-a6399f0ee6ca +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN7MoaUqxxd7a9qfkzzBSkzT] dev.restate.sdk.core.InvocationStateMachine - End invocation +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] UserFeed - Creating post 382f3687-fb11-49fa-912c-18a886dd1ecd for user userid1 +2025-02-27 15:53:43 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] UserFeed - Content moderation for post 382f3687-fb11-49fa-912c-18a886dd1ecd is still pending... Will check again in 5 seconds +2025-02-27 15:53:48 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:54:23 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:54:28 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:54:28 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] UserFeed - Content moderation for post 382f3687-fb11-49fa-912c-18a886dd1ecd is done +2025-02-27 15:54:28 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] UserFeed - Updating user feed for user userid1 with post 382f3687-fb11-49fa-912c-18a886dd1ecd +2025-02-27 15:54:28 INFO [UserFeed/processPost][inv_13puWeoWJykN6geV0KhVhI46atSq8tEE1j] dev.restate.sdk.core.InvocationStateMachine - End invocation +2025-02-27 15:54:33 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:54:38 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is still pending... Will check again in 5 seconds +2025-02-27 15:55:03 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Content moderation for post ee5b9dde-fc81-4819-a411-916e5c2b0c0d is done +2025-02-27 15:55:03 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] UserFeed - Updating user feed for user userid2 with post ee5b9dde-fc81-4819-a411-916e5c2b0c0d +2025-02-27 15:55:03 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl6kXNXP0jrPUYtCc9mabKet] dev.restate.sdk.core.InvocationStateMachine - End invocation +``` + +As you see, slow events do not block other slow events. +Restate effectively created a queue per user ID. + +The handler creates the social media post and waits for content moderation to finish. +If the moderation takes long, and there is an infrastructure crash, then Restate will trigger a retry. +The handler will fast-forward to where it was, will recover the post ID and will continue waiting for moderation to finish. + +You can try it out by killing Restate or the service halfway through processing a post. + +
+
+ +## Event Enrichment / Joins +[](src/main/kotlin/my/example/eventenrichment/PackageTracker.kt) + +This example shows an example of: +- **Event enrichment** over different sources: RPC and Kafka +- **Stateful actors / Digital twins** updated over Kafka +- **Streaming join** +- Populating state from events and making it queryable via RPC handlers. + +The example implements a package delivery tracking service. +Packages are registered via an RPC handler, and their location is updated via Kafka events. +The Package Tracker Virtual Object tracks the package details and its location history. + +
+Running the example + +1. Start the Kafka broker via Docker Compose: `docker compose up -d`. + +2. Start Restate Server with the Kafka broker configuration in a separate shell: `restate-server --config-file restate.toml` + +3. Start the service: `./gradlew -PmainClass=my.example.eventenrichment.PackageTrackerKt run` + +4. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080` + +5. Let Restate subscribe to the Kafka topic `package-location-updates` and invoke `PackageTracker/updateLocation` on each message. + ```shell + curl localhost:9070/subscriptions -H 'content-type: application/json' \ + -d '{ + "source": "kafka://my-cluster/package-location-updates", + "sink": "service://PackageTracker/updateLocation", + "options": {"auto.offset.reset": "earliest"} + }' + ``` + +6. Register a new package via the RPC handler: + ```shell + curl localhost:8080/PackageTracker/package1/registerPackage \ + -H 'content-type: application/json' -d '{"finalDestination": "Bridge 6, Amsterdam"}' + ``` + +7. Start a Kafka producer and publish some messages to update the location of the package on the `package-location-updates` topic: + ```shell + docker exec -it broker kafka-console-producer --bootstrap-server broker:29092 --topic package-location-updates --property parse.key=true --property key.separator=: + ``` + Send messages like + ``` + package1:{"timestamp": "2024-10-10 13:00", "location": "Pinetree Road 5, Paris"} + package1:{"timestamp": "2024-10-10 14:00", "location": "Mountain Road 155, Brussels"} + ``` + +8. Query the package location via the RPC handler: + ```shell + curl localhost:8080/PackageTracker/package1/getPackageInfo + ``` + or via the CLI: `restate kv get PackageTracker package1` + + You can see how the state was enriched by the initial RPC event and the subsequent Kafka events: + +
+ See Output + + ``` + 🤖 State: + ――――――――― + + Service PackageTracker + Key package1 + + KEY VALUE + package-info { + "finalDestination": "Bridge 6, Amsterdam", + "locations": [ + { + "location": "Pinetree Road 5, Paris", + "timestamp": "2024-10-10 13:00" + }, + { + "location": "Mountain Road 155, Brussels", + "timestamp": "2024-10-10 14:00" + } + ] + } + ``` + +
+
\ No newline at end of file diff --git a/kotlin/patterns-use-cases/docker-compose.yaml b/kotlin/patterns-use-cases/docker-compose.yaml new file mode 100644 index 00000000..2e33e605 --- /dev/null +++ b/kotlin/patterns-use-cases/docker-compose.yaml @@ -0,0 +1,39 @@ +version: '3' +services: + broker: + image: confluentinc/cp-kafka:7.5.0 + container_name: broker + ports: + - "9092:9092" + - "9101:9101" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_NODE_ID: 1 + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093 + KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LOG_DIRS: /tmp/kraft-combined-logs + CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk + + init-kafka: + image: confluentinc/cp-kafka:7.5.0 + depends_on: + - broker + entrypoint: [ '/bin/sh', '-c' ] + command: | + "# blocks until kafka is reachable + kafka-topics --bootstrap-server broker:29092 --list + echo -e 'Creating kafka topics' + kafka-topics --bootstrap-server broker:29092 --create --if-not-exists --topic social-media-posts --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server broker:29092 --create --if-not-exists --topic package-location-updates --replication-factor 1 --partitions 1 + + echo -e 'Successfully created the following topics:' + kafka-topics --bootstrap-server broker:29092 --list" \ No newline at end of file diff --git a/kotlin/patterns-use-cases/restate.toml b/kotlin/patterns-use-cases/restate.toml new file mode 100644 index 00000000..8a0bde1c --- /dev/null +++ b/kotlin/patterns-use-cases/restate.toml @@ -0,0 +1,3 @@ +[[ingress.kafka-clusters]] +name = "my-cluster" +brokers = ["PLAINTEXT://localhost:9092"] \ No newline at end of file diff --git a/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventenrichment/PackageTracker.kt b/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventenrichment/PackageTracker.kt new file mode 100644 index 00000000..2a4814a0 --- /dev/null +++ b/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventenrichment/PackageTracker.kt @@ -0,0 +1,48 @@ +package my.example.eventenrichment + +import dev.restate.sdk.annotation.Handler +import dev.restate.sdk.annotation.Shared +import dev.restate.sdk.annotation.VirtualObject +import dev.restate.sdk.common.TerminalException +import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder +import dev.restate.sdk.kotlin.KtStateKey +import dev.restate.sdk.kotlin.ObjectContext +import dev.restate.sdk.kotlin.SharedObjectContext +import kotlinx.serialization.Serializable + +@Serializable +data class PackageInfo(val finalDestination: String, val locations: MutableList = mutableListOf()) +@Serializable +data class LocationUpdate(val timestamp: String, val location: String) + +@VirtualObject +class PackageTracker { + + companion object { + private val PACKAGE_INFO = KtStateKey.json("package-info") + } + + @Handler + suspend fun registerPackage(ctx: ObjectContext, packageInfo: PackageInfo) { + ctx.set(PACKAGE_INFO, packageInfo) + } + + @Handler + suspend fun updateLocation(ctx: ObjectContext, locationUpdate: LocationUpdate) { + val packageInfo = ctx.get(PACKAGE_INFO) + ?: throw TerminalException("Package not found") + + packageInfo.locations.add(locationUpdate) + ctx.set(PACKAGE_INFO, packageInfo) + } + + @Shared + suspend fun getPackageInfo(ctx: SharedObjectContext): PackageInfo { + return ctx.get(PACKAGE_INFO) + ?: throw TerminalException("Package not found") + } +} + +fun main() { + RestateHttpEndpointBuilder.builder().bind(PackageTracker()).buildAndListen() +} diff --git a/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/UserFeed.kt b/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/UserFeed.kt new file mode 100644 index 00000000..7dd04515 --- /dev/null +++ b/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/UserFeed.kt @@ -0,0 +1,37 @@ +package my.example.eventtransactions + +import dev.restate.sdk.annotation.Handler +import dev.restate.sdk.annotation.VirtualObject +import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder +import dev.restate.sdk.kotlin.ObjectContext +import dev.restate.sdk.kotlin.runBlock +import kotlinx.serialization.Serializable +import my.example.eventtransactions.utils.createPost +import my.example.eventtransactions.utils.getPostStatus +import my.example.eventtransactions.utils.updateUserFeed +import kotlin.time.Duration.Companion.milliseconds + +@VirtualObject +class UserFeed { + + @Serializable + data class SocialMediaPost(val content: String, val metadata: String) + + @Handler + suspend fun processPost(ctx: ObjectContext, post: SocialMediaPost) { + val userId = ctx.key() + + val postId = ctx.runBlock { createPost(userId, post) } + + while (ctx.runBlock { getPostStatus(postId) } == "PENDING") { + ctx.sleep(5000.milliseconds) + } + + ctx.runBlock { updateUserFeed(userId, postId) } + } +} + +fun main() { + RestateHttpEndpointBuilder.builder().bind(UserFeed()).buildAndListen() +} + diff --git a/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/utils/stubs.kt b/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/utils/stubs.kt new file mode 100644 index 00000000..e940cbb1 --- /dev/null +++ b/kotlin/patterns-use-cases/src/main/kotlin/my/example/eventtransactions/utils/stubs.kt @@ -0,0 +1,28 @@ +package my.example.eventtransactions.utils + +import my.example.eventtransactions.UserFeed +import org.apache.logging.log4j.LogManager +import org.apache.logging.log4j.Logger +import java.util.UUID + +private val logger: Logger = LogManager.getLogger("UserFeed") + +fun createPost(userId: String, post: UserFeed.SocialMediaPost): String { + val postId = UUID.randomUUID().toString() + logger.info("Creating post {} for user {}", postId, userId) + return postId +} + +fun getPostStatus(postId: String): String { + return if (Math.random() < 0.8) { + logger.info("Content moderation for post {} is still pending... Will check again in 5 seconds", postId) + "PENDING" + } else { + logger.info("Content moderation for post {} is done", postId) + "DONE" + } +} + +fun updateUserFeed(userId: String, postId: String) { + logger.info("Updating user feed for user {} with post {}", userId, postId) +} \ No newline at end of file diff --git a/kotlin/patterns-use-cases/src/main/kotlin/my/example/queue/AsyncTaskService.kt b/kotlin/patterns-use-cases/src/main/kotlin/my/example/queue/AsyncTaskService.kt new file mode 100644 index 00000000..93f12e96 --- /dev/null +++ b/kotlin/patterns-use-cases/src/main/kotlin/my/example/queue/AsyncTaskService.kt @@ -0,0 +1,26 @@ +package my.example.queue + +import dev.restate.sdk.annotation.Handler +import dev.restate.sdk.annotation.Service +import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder +import dev.restate.sdk.kotlin.Context +import kotlinx.serialization.Serializable + +@Service +class AsyncTaskService { + @Handler + suspend fun runTask(ctx: Context, params: TaskOpts): String { + return params.someHeavyWork() + } +} + +fun main() { + RestateHttpEndpointBuilder.builder().bind(AsyncTaskService()).buildAndListen() +} + +@Serializable +class TaskOpts + +fun TaskOpts.someHeavyWork(): String { + return "someHeavyWork" +} diff --git a/kotlin/patterns-use-cases/src/main/kotlin/my/example/queue/TaskSubmitter.kt b/kotlin/patterns-use-cases/src/main/kotlin/my/example/queue/TaskSubmitter.kt new file mode 100644 index 00000000..c8b624ef --- /dev/null +++ b/kotlin/patterns-use-cases/src/main/kotlin/my/example/queue/TaskSubmitter.kt @@ -0,0 +1,46 @@ +package my.example.queue + +import dev.restate.sdk.client.CallRequestOptions +import dev.restate.sdk.client.Client +import dev.restate.sdk.kotlin.KtSerdes + +/* + * Restate is as a sophisticated task queue, with extra features like: + * - delaying execution and reliable timers + * - stateful tasks + * - queues per key (>< per partition; slow tasks for a key don't block others) + * - retries and recovery upon failures + * + * Every handler in Restate is executed asynchronously and can be treated + * as a reliable asynchronous task. + */ +class TaskSubmitter { + companion object { + private val restateClient: Client = Client.connect("http://localhost:8080") + } + + suspend fun TaskOpts.scheduleTask() { + // submit the task; similar to publishing a message to a queue + // Restate ensures the task is executed exactly once + val handle = + AsyncTaskServiceClient.fromClient(restateClient) + // optionally add a delay to execute the task later + .send(/*5.days*/) + .runTask( + this, + // use a stable uuid as an idempotency key; Restate deduplicates for us + CallRequestOptions.DEFAULT.withIdempotency("dQw4w9WgXcQ"), + ) + + + // ... do other things while the task is being processed ... + + // await the handler's result; optionally from another process + val result = + restateClient.invocationHandle( + handle.invocationId, + KtSerdes.json(), + ) + .attach() + } +} diff --git a/python/README.md b/python/README.md index b97f7642..903c4841 100644 --- a/python/README.md +++ b/python/README.md @@ -1,5 +1,8 @@ # Python Example Catalog +## Prerequisites +- Python >= v3.11 + ## Basics Learn the key concepts of Restate: diff --git a/rust/README.md b/rust/README.md index 19caa048..84995d28 100644 --- a/rust/README.md +++ b/rust/README.md @@ -1,5 +1,8 @@ # Rust Example Catalog +## Prerequisites +- [Rust](https://rustup.rs/) + ## Basics - **[Services - Durable Execution](basics)**: Making code resilient to failures via automatic retries and recovery of previously finished actions. [](basics/src/p0_durable_execution.rs) diff --git a/typescript/README.md b/typescript/README.md index 6daf2282..977b13c9 100644 --- a/typescript/README.md +++ b/typescript/README.md @@ -1,5 +1,9 @@ # TypeScript Example Catalog +## Prerequisites +- NodeJS >= v18.17.1 +- npm CLI >= 9.6.7 + ## Basics Learn the key concepts of Restate: