diff --git a/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md b/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md index 2acf112527..f5d2cbb8d9 100644 --- a/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md +++ b/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md @@ -6,7 +6,7 @@ weight: 20000 description: How to get up and running with workflows using the Dapr Java SDK --- -Let’s create a Dapr workflow and invoke it using the console. With the [provided workflow example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows), you will: +Let's create a Dapr workflow and invoke it using the console. With the [provided workflow example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows), you will: - Execute the workflow instance using the [Java workflow worker](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java) - Utilize the Java workflow client and API calls to [start and terminate workflow instances](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java) @@ -85,11 +85,10 @@ You're up and running! Both Dapr and your app logs will appear here. == APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001. ``` -## Run the `DemoWorkflowClient +## Run the `DemoWorkflowClient` The `DemoWorkflowClient` starts instances of workflows that have been registered with Dapr. - ```java public class DemoWorkflowClient { @@ -246,4 +245,45 @@ Exiting DemoWorkflowClient. ## Next steps - [Learn more about Dapr workflow]({{< ref workflow-overview.md >}}) -- [Workflow API reference]({{< ref workflow_api.md >}}) \ No newline at end of file +- [Workflow API reference]({{< ref workflow_api.md >}}) + +## Advanced features + +### Task Execution Keys + +Task execution keys are unique identifiers generated by the durabletask-java library. They are stored in the `WorkflowActivityContext` and can be used to track and manage the execution of workflow activities. They are particularly useful for: + +1. **Idempotency**: Ensuring activities are not executed multiple times for the same task +2. **State Management**: Tracking the state of activity execution +3. **Error Handling**: Managing retries and failures in a controlled manner + +Here's an example of how to use task execution keys in your workflow activities: + +```java +public class TaskExecutionKeyActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + // Get the task execution key for this activity + String taskExecutionKey = ctx.getTaskExecutionKey(); + + // Use the key to implement idempotency or state management + // For example, check if this task has already been executed + if (isTaskAlreadyExecuted(taskExecutionKey)) { + return getPreviousResult(taskExecutionKey); + } + + // Execute the activity logic + Object result = executeActivityLogic(); + + // Store the result with the task execution key + storeResult(taskExecutionKey, result); + + return result; + } +} +``` + + + + + diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java index 5c6a360c8a..bb1f1c7689 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java @@ -15,6 +15,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; + import io.dapr.testcontainers.Component; import io.dapr.testcontainers.DaprContainer; import io.dapr.testcontainers.DaprLogLevel; @@ -40,6 +41,7 @@ import java.util.Map; import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; +import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -117,6 +119,29 @@ public void testWorkflows() throws Exception { assertEquals(instanceId, workflowOutput.getWorkflowId()); } + @Test + public void testExecutionKeyWorkflows() throws Exception { + TestWorkflowPayload payload = new TestWorkflowPayload(new ArrayList<>()); + String instanceId = workflowClient.scheduleNewWorkflow(TestExecutionKeysWorkflow.class, payload); + + workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(100), false); + + Duration timeout = Duration.ofSeconds(1000); + WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, timeout, true); + + assertNotNull(workflowStatus); + + TestWorkflowPayload workflowOutput = deserialize(workflowStatus.getSerializedOutput()); + + assertEquals(1, workflowOutput.getPayloads().size()); + assertEquals("Execution key found", workflowOutput.getPayloads().get(0)); + + String executionKey = workflowOutput.getWorkflowId() +"-"+"io.dapr.it.testcontainers.TaskExecutionKeyActivity"; + assertTrue(KeyStore.getInstance().getKey(executionKey)); + + assertEquals(instanceId, workflowOutput.getWorkflowId()); + } + private TestWorkflowPayload deserialize(String value) throws JsonProcessingException { return OBJECT_MAPPER.readValue(value, TestWorkflowPayload.class); } diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java new file mode 100644 index 0000000000..017e1c50be --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java @@ -0,0 +1,51 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.dapr.it.testcontainers; + +import java.util.HashMap; +import java.util.Map; + +public class KeyStore { + + private final Map keyStore = new HashMap<>(); + + private static KeyStore instance; + + private KeyStore() { + } + + public static KeyStore getInstance() { + if (instance == null) { + synchronized (KeyStore.class) { + if (instance == null) { + instance = new KeyStore(); + } + } + } + return instance; + } + + + public void addKey(String key, Boolean value) { + keyStore.put(key, value); + } + + public Boolean getKey(String key) { + return keyStore.get(key); + } + + public void removeKey(String key) { + keyStore.remove(key); + } + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionKeyActivity.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionKeyActivity.java new file mode 100644 index 0000000000..c1a5b50381 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionKeyActivity.java @@ -0,0 +1,35 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; + +public class TaskExecutionKeyActivity implements WorkflowActivity { + + @Override + public Object run(WorkflowActivityContext ctx) { + TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class); + KeyStore keyStore = KeyStore.getInstance(); + Boolean exists = keyStore.getKey(ctx.getTaskExecutionKey()); + if (!Boolean.TRUE.equals(exists)) { + keyStore.addKey(ctx.getTaskExecutionKey(), true); + workflowPayload.getPayloads().add("Execution key not found"); + throw new IllegalStateException("Task execution key not found"); + } + workflowPayload.getPayloads().add("Execution key found"); + return workflowPayload; + } + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprWorkflowsConfiguration.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprWorkflowsConfiguration.java index 0a2487b70c..e868b18870 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprWorkflowsConfiguration.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprWorkflowsConfiguration.java @@ -56,9 +56,12 @@ public WorkflowRuntimeBuilder workflowRuntimeBuilder( WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(new Properties(overrides)); builder.registerWorkflow(TestWorkflow.class); + builder.registerWorkflow(TestExecutionKeysWorkflow.class); builder.registerActivity(FirstActivity.class); builder.registerActivity(SecondActivity.class); - + builder.registerActivity(TaskExecutionKeyActivity.class); + + return builder; } } diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java new file mode 100644 index 0000000000..30a9ea33f2 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java @@ -0,0 +1,58 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers; + +import io.dapr.durabletask.Task; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.WorkflowTaskOptions; +import io.dapr.workflows.WorkflowTaskRetryPolicy; + +import java.time.Duration; + +import org.slf4j.Logger; + +public class TestExecutionKeysWorkflow implements Workflow { + + @Override + public WorkflowStub create() { + return ctx -> { + + Logger logger = ctx.getLogger(); + String instanceId = ctx.getInstanceId(); + logger.info("Starting Workflow: " + ctx.getName()); + logger.info("Instance ID: " + instanceId); + logger.info("Current Orchestration Time: " + ctx.getCurrentInstant()); + + TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class); + workflowPayload.setWorkflowId(instanceId); + + WorkflowTaskOptions options = new WorkflowTaskOptions(WorkflowTaskRetryPolicy.newBuilder() + .setMaxNumberOfAttempts(3) + .setFirstRetryInterval(Duration.ofSeconds(1)) + .setMaxRetryInterval(Duration.ofSeconds(10)) + .setBackoffCoefficient(2.0) + .setRetryTimeout(Duration.ofSeconds(50)) + .build()); + + + Task t = ctx.callActivity(TaskExecutionKeyActivity.class.getName(), workflowPayload, options,TestWorkflowPayload.class); + + TestWorkflowPayload payloadAfterExecution = t.await(); + + ctx.complete(payloadAfterExecution); + }; + } + +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java index 3fe5d88a23..90a2c41a59 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java @@ -17,6 +17,8 @@ public interface WorkflowActivityContext { String getName(); + String getTaskExecutionKey(); + T getInput(Class targetType); } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java index 551c21a373..217c3cd183 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java @@ -56,4 +56,9 @@ public String getName() { public T getInput(Class targetType) { return this.innerContext.getInput(targetType); } + + @Override + public String getTaskExecutionKey() { + return this.innerContext.getTaskExecutionKey(); + } } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java index 76a7e07af1..81ac492e05 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java @@ -16,7 +16,7 @@ public static class TestActivity implements WorkflowActivity { @Override public Object run(WorkflowActivityContext ctx) { String activityContextName = ctx.getName(); - return ctx.getInput(String.class) + " world! from " + activityContextName; + return ctx.getInput(String.class) + " world! from " + activityContextName + " with task execution key " + ctx.getTaskExecutionKey(); } } @@ -37,10 +37,11 @@ public void createWithClass() { when(mockContext.getInput(String.class)).thenReturn("Hello"); when(mockContext.getName()).thenReturn("TestActivityContext"); + when(mockContext.getTaskExecutionKey()).thenReturn("123"); Object result = wrapper.create().run(mockContext); verify(mockContext, times(1)).getInput(String.class); - assertEquals("Hello world! from TestActivityContext", result); + assertEquals("Hello world! from TestActivityContext with task execution key 123", result); } }