Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update sdks #148

Merged
merged 3 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion basics/basics-java/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ repositories {
mavenCentral()
}

val restateVersion = "0.9.2"
val restateVersion = "1.0.0"

dependencies {
annotationProcessor("dev.restate:sdk-api-gen:$restateVersion")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

package durable_execution;

import dev.restate.sdk.JsonSerdes;
import dev.restate.sdk.Context;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.Service;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder;
import utils.Permission;
import durable_execution.utils.UpdateRequest;
Expand Down Expand Up @@ -45,7 +45,7 @@ public void applyRoleUpdate(Context ctx, UpdateRequest req) {
// Apply a change to one system (e.g., DB upsert, API call, ...).
// The side effect persists the result with a consensus method so
// any later code relies on a deterministic result.
boolean success = ctx.run(CoreSerdes.JSON_BOOLEAN, () ->
boolean success = ctx.run(JsonSerdes.BOOLEAN, () ->
applyUserRole(req.getUserId(), req.getRole()));
if (!success) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@
// entire partitions.
//

import dev.restate.sdk.JsonSerdes;
import dev.restate.sdk.ObjectContext;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.VirtualObject;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder;
import events_state.ProfileService;
import utils.UserUpdate;

import java.time.Duration;
Expand Down Expand Up @@ -54,19 +53,19 @@ public class UserUpdatesService {
public void updateUserEvent(ObjectContext ctx, UserUpdate update) {

// event handler is a durably executed function that can use all the features of Restate
String userId = ctx.run(CoreSerdes.JSON_STRING, () -> updateUserProfile(update.getProfile()));
String userId = ctx.run(JsonSerdes.STRING, () -> updateUserProfile(update.getProfile()));
while(userId.equals("NOT_READY")) {
// Delay the processing of the event by sleeping.
// The other events for this Virtual Object / key are queued.
// Events for other keys are processed concurrently.
// The sleep suspends the function (e.g., when running on FaaS).
ctx.sleep(Duration.ofMillis(5000));
userId = ctx.run(CoreSerdes.JSON_STRING, () -> updateUserProfile(update.getProfile()));
userId = ctx.run(JsonSerdes.STRING, () -> updateUserProfile(update.getProfile()));
}


String finalUserId = userId;
String roleId = ctx.run(CoreSerdes.JSON_STRING,
String roleId = ctx.run(JsonSerdes.STRING,
() -> setUserPermissions(finalUserId, update.getPermissions()));
ctx.run(() -> provisionResources(finalUserId, roleId, update.getResources()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

package events_state;

import dev.restate.sdk.JsonSerdes;
import dev.restate.sdk.ObjectContext;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.VirtualObject;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder;
import utils.UserProfile;
Expand All @@ -27,10 +27,10 @@
public class ProfileService {

private static final StateKey<String> NAME =
StateKey.of("name", CoreSerdes.JSON_STRING);
StateKey.of("name", JsonSerdes.STRING);

private static final StateKey<String> EMAIL =
StateKey.of("email", CoreSerdes.JSON_STRING);
StateKey.of("email", JsonSerdes.STRING);

@Handler
public void registration(ObjectContext ctx, String name){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
*/
package virtual_objects;

import dev.restate.sdk.JsonSerdes;
import dev.restate.sdk.ObjectContext;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.VirtualObject;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder;

Expand All @@ -32,7 +32,7 @@
public class GreeterObject {

private static final StateKey<Integer> COUNT =
StateKey.of("available-drivers", CoreSerdes.JSON_INT);
StateKey.of("available-drivers", JsonSerdes.INT);

@Handler
public String greet(ObjectContext ctx, String greeting) {
Expand Down
3 changes: 2 additions & 1 deletion basics/basics-typescript/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"example-4": "ts-node-dev --transpile-only src/4_virtual_objects.ts"
},
"dependencies": {
"@restatedev/restate-sdk": "^0.9.2"
"@restatedev/restate-sdk": "^1.0.0",
"@restatedev/restate-sdk-clients": "^1.0.0"
},
"devDependencies": {
"@restatedev/restate": "^0.9.2",
Expand Down
117 changes: 60 additions & 57 deletions basics/basics-typescript/src/3_workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/

import * as restate from "@restatedev/restate-sdk";
import { workflow as wf } from "@restatedev/restate-sdk";
import * as restateClients from "@restatedev/restate-sdk-clients";
import { createUserEntry, sendEmailWithLink } from "./utils/workflow_stubs";

//
Expand All @@ -23,87 +23,90 @@ import { createUserEntry, sendEmailWithLink } from "./utils/workflow_stubs";
// Workflow instances always have a unique ID that identifies the workflow execution.
// Each workflow instance (ID) can run only once (to success or failure).
//
const myWorkflow = wf.workflow("usersignup", {
const myWorkflow = restate.workflow({
// --- The workflow logic is in the run() function ---
name: "usersignup",
handlers: {
run: async (ctx: restate.WorkflowContext, params: { name: string; email: string }) => {
const {name, email} = params;
const userId = ctx.key;

// publish state, for the world to see our progress
ctx.set("stage", "Creating User");

// use all the standard durable execution features here
await ctx.run(() => createUserEntry({userId, name}));

ctx.set("stage", "Email Verification");

// send the email with the verification secret
const secret = await ctx.run(() => crypto.randomUUID());
ctx.run(() => sendEmailWithLink({email, secret}));

try {
// the promise here is resolved or rejected by the additional workflow methods below
const clickSecret = await ctx.promise<string>("email-link");
if (clickSecret !== secret) {
throw new restate.TerminalError("Wrong secret from email link");
}
} catch (err: any) {
ctx.set("stage", "Verification failed: " + err.message);
return;
}

run: async (ctx: wf.WfContext, params: { name: string; email: string }) => {
const { name, email } = params;
const userId = ctx.workflowId();

// publish state, for the world to see our progress
ctx.set("stage", "Creating User");
ctx.set("stage", "User verified");
},

// use all the standard durable execution features here
await ctx.run(() => createUserEntry({ userId, name }));
// --- various interactions for queries and signals ---

ctx.set("stage", "Email Verification");
getStage: (ctx: restate.WorkflowSharedContext) => {
// read the state to get the stage where the workflow is
return ctx.get("stage");
},

// send the email with the verification secret
const secret = await ctx.run(() => crypto.randomUUID());
ctx.run(() => sendEmailWithLink({ email, secret }));
verifyEmail: async (ctx: restate.WorkflowSharedContext, request: { secret: string }) => {
// resolve the durable promise to let the awaiter know
await ctx.promise<string>("email-link").resolve(request.secret);
},

try {
// the promise here is resolved or rejected by the additional workflow methods below
const clickSecret = await ctx.promise<string>("email-link");
if (clickSecret !== secret) {
throw new restate.TerminalError("Wrong secret from email link");
}
} catch (err: any) {
ctx.set("stage", "Verification failed: " + err.message);
return;
}

ctx.set("stage", "User verified");
},

// --- various interactions for queries and signals ---

getStage: (ctx: wf.SharedWfContext) => {
// read the state to get the stage where the workflow is
return ctx.get("stage");
},

verifyEmail: async (ctx: wf.SharedWfContext, request: { secret: string }) => {
// resolve the durable promise to let the awaiter know
ctx.promise<string>("email-link").resolve(request.secret);
},

abortVerification: async (ctx: wf.SharedWfContext) => {
// failing the durable promise will throw an Error for the awaiting thread
ctx.promise<string>("email-link").reject("User aborted verification");
},
abortVerification: async (ctx: restate.WorkflowSharedContext) => {
// failing the durable promise will throw an Error for the awaiting thread
await ctx.promise<string>("email-link").reject("User aborted verification");
},
}
});

export const workflowApi = myWorkflow.api;
restate.endpoint().bind(myWorkflow).listen();

export type WorkflowApi = typeof myWorkflow;

// ---------- ⬆️⬆️ deploy this as a container, lambda, etc. ⬆️⬆️ ----------

// start it via an HTTP call.
// `curl restate:8080/usersignup/submit --json '{ "request": {
// "workflowId": "signup-userid1",
// "name": "Bob",
// "email": "[email protected]"
// } }'
// `curl restate:8080/usersignup/signup-userid1/run/send --json '{ "name": "Bob", "email": "[email protected]" }'

// or programmatically
async function signupUser(userId: string, name: string, email: string) {
const rs = restate.clients.connect("http://restate:8080");
const { client, status } = await rs.submitWorkflow(workflowApi, "signup-" + userId, {
const rs = restateClients.connect({ url: "http://restate:8080" });
const workflow: WorkflowApi = { name: "usersignup" };
const workflowClient = rs.workflowClient(workflow, "signup-" + userId);
const { status } = await workflowClient.workflowSubmit({
name,
email,
});

if (status != wf.WorkflowStartResult.STARTED) {
if (status != "Accepted") {
throw new Error("User ID already taken");
}

await client.result();
await workflowClient.workflowAttach();
}

// interact with the workflow from any other code
async function verifyEmail(userId: string, emailSecret: string) {
const rs = restate.clients.connect("http://restate:8080");
const { client, status } = await rs.connectToWorkflow(workflowApi, "signup-" + userId);
const rs = restateClients.connect({ url: "http://restate:8080" });
const workflow: WorkflowApi = { name: "usersignup" };
const workflowClient = rs.workflowClient(workflow, "signup-" + userId);

client.workflowInterface().verifyEmail({ secret: emailSecret });
workflowClient.verifyEmail({ secret: emailSecret });
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repositories {
mavenCentral()
}

val restateVersion = "0.9.2"
val restateVersion = "1.0.0"

dependencies {
// Restate SDK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

package dev.restate.sdk.examples;

import dev.restate.sdk.JsonSerdes;
import dev.restate.sdk.ObjectContext;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.VirtualObject;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.common.Serde;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.common.TerminalException;
Expand Down Expand Up @@ -59,7 +59,7 @@ public void start(ObjectContext ctx, DeliveryRequest request) throws TerminalExc
ctx.set(DELIVERY_INFO, deliveryInfo);

// Acquire a driver
var driverAwakeable = ctx.awakeable(CoreSerdes.JSON_STRING);
var driverAwakeable = ctx.awakeable(JsonSerdes.STRING);
DriverDeliveryMatcherClient.fromContext(ctx, GeoUtils.DEMO_REGION)
.send()
.requestDriverForDelivery(driverAwakeable.id());
Expand Down Expand Up @@ -119,7 +119,7 @@ public void notifyDeliveryDelivered(ObjectContext ctx) throws TerminalException
ctx.clear(DELIVERY_INFO);

// Notify the OrderService that the delivery has been completed
ctx.awakeableHandle(delivery.getCallbackId()).resolve(CoreSerdes.VOID, null);
ctx.awakeableHandle(delivery.getCallbackId()).resolve(Serde.VOID, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
package dev.restate.sdk.examples;

import com.fasterxml.jackson.core.type.TypeReference;
import dev.restate.sdk.JsonSerdes;
import dev.restate.sdk.ObjectContext;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.VirtualObject;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.common.TerminalException;
import dev.restate.sdk.serde.jackson.JacksonSerdes;
Expand Down Expand Up @@ -52,7 +52,7 @@ public void setDriverAvailable(ObjectContext ctx, String driverId) throws Termin
// Update the queue in state. Delivery was removed.
ctx.set(PENDING_DELIVERIES, pendingDeliveries);
// Notify that delivery is ongoing
ctx.awakeableHandle(nextDelivery).resolve(CoreSerdes.JSON_STRING, driverId);
ctx.awakeableHandle(nextDelivery).resolve(JsonSerdes.STRING, driverId);
return;
}

Expand All @@ -77,7 +77,7 @@ public void requestDriverForDelivery(ObjectContext ctx, String deliveryCallbackI
// Remove driver from the pool
ctx.set(AVAILABLE_DRIVERS, availableDrivers);
// Notify that delivery is ongoing
ctx.awakeableHandle(deliveryCallbackId).resolve(CoreSerdes.JSON_STRING, nextAvailableDriver);
ctx.awakeableHandle(deliveryCallbackId).resolve(JsonSerdes.STRING, nextAvailableDriver);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

package dev.restate.sdk.examples;

import dev.restate.sdk.JsonSerdes;
import dev.restate.sdk.ObjectContext;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.VirtualObject;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.common.TerminalException;
import dev.restate.sdk.examples.types.StatusEnum;
Expand All @@ -25,7 +25,7 @@ public class OrderStatusService {

private static final StateKey<StatusEnum> ORDER_STATUS =
StateKey.of("order-status", JacksonSerdes.of(StatusEnum.class));
private static final StateKey<Long> ORDER_ETA = StateKey.of("order-eta", CoreSerdes.JSON_LONG);
private static final StateKey<Long> ORDER_ETA = StateKey.of("order-eta", JsonSerdes.LONG);

public static class OrderStatus {
private final StatusEnum status;
Expand Down
Loading
Loading