Skip to content

Commit

Permalink
Java enrichment example
Browse files Browse the repository at this point in the history
  • Loading branch information
gvdongen committed Dec 17, 2024
1 parent f8d3ccb commit 748e7cd
Show file tree
Hide file tree
Showing 14 changed files with 260 additions and 108 deletions.
79 changes: 74 additions & 5 deletions java/patterns-use-cases/event-processing-enrichment/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,80 @@
# Hello world - Java HTTP example
# Event Processing Example: Event Enrichment

Sample project configuration of a Restate service using the Java SDK and HTTP server.
This example shows an example of:
- **Event enrichment** over different sources: RPC and Kafka
- **Stateful actors / Digital twins** updated over Kafka
- **Streaming join**
- Populating state from events and making it queryable via RPC handlers.

Have a look at the [Java Quickstart guide](https://docs.restate.dev/get_started/quickstart?sdk=java) for more information on how to use this project.
The example implements a package delivery tracking service.
Packages are registered via an RPC handler, and their location is updated via Kafka events.
The Package Tracker Virtual Object tracks the package details and its location history.

To run:
## Running the example

1. Start the Kafka broker via Docker Compose: `docker compose up -d`.

2. Start Restate Server with the Kafka broker configuration in a separate shell: `restate-server --config-file restate.toml`

3. Start the service: `./gradlew run`

4. Register the example at Restate server by calling
`restate -y deployment register localhost:9080`.

5. Let Restate subscribe to the Kafka topic `package-location-updates` and invoke `PackageTracker/updateLocation` on each message.
```shell
curl localhost:9070/subscriptions -H 'content-type: application/json' \
-d '{
"source": "kafka://my-cluster/package-location-updates",
"sink": "service://PackageTracker/updateLocation",
"options": {"auto.offset.reset": "earliest"}
}'
```

## Demo scenario

1. Register a new package via the RPC handler:
```shell
./gradlew run
curl localhost:8080/PackageTracker/package1/registerPackage \
-H 'content-type: application/json' -d '{"finalDestination": "Bridge 6, Amsterdam"}'
```

2. Start a Kafka producer and publish some messages to update the location of the package on the `package-location-updates` topic:
```shell
docker exec -it broker kafka-console-producer --bootstrap-server broker:29092 --topic package-location-updates --property parse.key=true --property key.separator=:
```
Send messages like
```
package1:{"timestamp": "2024-10-10 13:00", "location": "Pinetree Road 5, Paris"}
package1:{"timestamp": "2024-10-10 14:00", "location": "Mountain Road 155, Brussels"}
```

3. Query the package location via the RPC handler:
```shell
curl localhost:8080/PackageTracker/package1/getPackageInfo
```
or via the CLI: `npx restate kv get package-tracker package1`

You can see how the state was enriched by the initial RPC event and the subsequent Kafka events:
```
🤖 State:
―――――――――
Service package-tracker
Key package1
KEY VALUE
package-info {
"finalDestination": "Bridge 6, Amsterdam",
"locations": [
{
"location": "Pinetree Road 5, Paris",
"timestamp": "2024-10-10 13:00"
},
{
"location": "Mountain Road 155, Brussels",
"timestamp": "2024-10-10 14:00"
}
]
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ dependencies {
// To use Jackson to read/write state entries (optional)
implementation("dev.restate:sdk-serde-jackson:$restateVersion")

// Jackson parameter names
// https://github.com/FasterXML/jackson-modules-java8/tree/2.14/parameter-names
implementation("com.fasterxml.jackson.module:jackson-module-parameter-names:2.16.1")
// Jackson java8 types
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.16.1")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.16.1")

// Logging (optional)
implementation("org.apache.logging.log4j:log4j-core:2.24.1")

Expand All @@ -28,9 +35,15 @@ dependencies {
testImplementation("dev.restate:sdk-testing:$restateVersion")
}

tasks.withType<JavaCompile> {
// Using -parameters allows to use Jackson ParameterName feature
// https://github.com/FasterXML/jackson-modules-java8/tree/2.14/parameter-names
options.compilerArgs.add("-parameters")
}

// Set main class
application {
mainClass.set("my.example.Greeter")
mainClass.set("my.example.PackageTracker")
}

tasks.named<Test>("test") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
version: '3'
services:
broker:
image: confluentinc/cp-kafka:7.5.0
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

init-kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- broker
entrypoint: [ '/bin/sh', '-c' ]
command: |
"# blocks until kafka is reachable
kafka-topics --bootstrap-server broker:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server broker:29092 --create --if-not-exists --topic package-location-updates --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server broker:29092 --list"
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[[ingress.kafka-clusters]]
name = "my-cluster"
brokers = ["PLAINTEXT://localhost:9092"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package my.example;

import dev.restate.sdk.ObjectContext;
import dev.restate.sdk.SharedObjectContext;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.Shared;
import dev.restate.sdk.annotation.VirtualObject;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.common.TerminalException;
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder;
import dev.restate.sdk.serde.jackson.JacksonSerdes;
import my.example.types.LocationUpdate;
import my.example.types.PackageInfo;

// Package tracking system:
// Digital twin representing a package in delivery with real-time location updates.
// Handlers get called over HTTP or Kafka.
@VirtualObject
public class PackageTracker {

private static final StateKey<PackageInfo> PACKAGE_INFO =
StateKey.of("package-info", JacksonSerdes.of(PackageInfo.class));

// Called first by the seller over HTTP
@Handler
public void registerPackage(ObjectContext ctx, PackageInfo packageInfo){
// Store the package details in the state
ctx.set(PACKAGE_INFO, packageInfo);
}

// Connected to a Kafka topic for real-time location updates
@Handler
public void updateLocation(ObjectContext ctx, LocationUpdate locationUpdate){
var packageInfo = ctx.get(PACKAGE_INFO)
.orElseThrow(() -> new TerminalException("Package not found"));

// Update the package info with the new location
packageInfo.addLocation(locationUpdate);
ctx.set(PACKAGE_INFO, packageInfo);
}

// Called by the delivery dashboard to get the package details
@Shared
public PackageInfo getPackageInfo(SharedObjectContext ctx){
return ctx.get(PACKAGE_INFO)
.orElseThrow(() -> new TerminalException("Package not found"));
}

public static void main(String[] args) {
RestateHttpEndpointBuilder.builder()
.bind(new PackageTracker())
.buildAndListen(9081);
}
}

// Example API Usage:
/*
curl localhost:8080/PackageTracker/package123/registerPackage -H 'content-type: application/json' -d '{ "finalDestination": "Bridge 6, Amsterdam"}'
curl localhost:8080/PackageTracker/package123/updateLocation -H 'content-type: application/json' -d '{ "timestamp": "2024-12-11T12:00:00Z", "location": "Warehouse A" }'
curl localhost:8080/PackageTracker/package123/getPackageInfo
*/

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package my.example.types;

public record LocationUpdate (String timestamp, String location) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package my.example.types;

import java.util.ArrayList;
import java.util.List;

public class PackageInfo {
private String finalDestination;
private List<LocationUpdate> locations = new ArrayList<>();

public PackageInfo(String finalDestination) {
this.finalDestination = finalDestination;
}

public String getFinalDestination() {
return finalDestination;
}

public void setFinalDestination(String finalDestination) {
this.finalDestination = finalDestination;
}

public List<LocationUpdate> getLocations() {
return locations;
}

public void setLocations(List<LocationUpdate> locations) {
this.locations = locations;
}

public void addLocation(LocationUpdate locationUpdate) {
this.locations.add(locationUpdate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ As you see, slow events do not block other slow events.
Restate effectively created a queue per user ID.
The handler creates the social media post and waits for content moderation to finish.
If the moderation takes long, and there is an infrastructure crash, then Restate will not recreate the post but will recover the post ID and will continue waiting for moderation to finish.
You can try it out by killing Restate or the service halfway through processing a post.
The retries and timers are tracked and managed resiliently by Restate.
If the moderation takes long, and there is an infrastructure crash, then Restate will trigger a retry.
The handler will fast-forward to where it was, will recover the post ID and will continue waiting for moderation to finish.
You can try it out by killing Restate or the service halfway through processing a post.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ If the upload takes too long, however, the client asks the upload service to sen

2. Start Restate Server in a separate shell: `npx restate-server`

3. Start the data upload service: `npm run app-dev`
3. Start the service: `npm run app-dev`

4. Register the example at Restate server by calling
`npx restate -y deployment register "localhost:9080"`.
Expand Down
30 changes: 15 additions & 15 deletions typescript/patterns-use-cases/event-processing-enrichment/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ The Package Tracker Virtual Object tracks the package details and its location h

3. Start Restate Server with the Kafka broker configuration in a separate shell: `npx restate-server --config-file restate.toml`

4. Start the data upload service: `npm run app-dev`
4. Start the service: `npm run app-dev`

5. Register the example at Restate server by calling
`npx restate -y deployment register "localhost:9080"`.
Expand Down Expand Up @@ -65,18 +65,18 @@ You can see how the state was enriched by the initial RPC event and the subseque
Service package-tracker
Key package1
KEY VALUE
details {
"finalDestination": "Bridge 6, Amsterdam",
"locations": [
{
"location": "Pinetree Road 5, Paris",
"timestamp": "2024-10-10 13:00"
},
{
"location": "Mountain Road 155, Brussels",
"timestamp": "2024-10-10 14:00"
}
]
}
KEY VALUE
package-info {
"finalDestination": "Bridge 6, Amsterdam",
"locations": [
{
"location": "Pinetree Road 5, Paris",
"timestamp": "2024-10-10 13:00"
},
{
"location": "Mountain Road 155, Brussels",
"timestamp": "2024-10-10 14:00"
}
]
}
```
Loading

0 comments on commit 748e7cd

Please sign in to comment.