Skip to content

Commit 7ed4d91

Browse files
cicoylesiri-varma
andauthored
Use dapr/durabletask-java (#1336)
* microsoft durabletask-java -> dapr durabletask-java Signed-off-by: Cassandra Coyle <[email protected]> * update another ref Signed-off-by: Cassandra Coyle <[email protected]> * 1.5.2 release Signed-off-by: Cassandra Coyle <[email protected]> * fix import order Signed-off-by: Cassandra Coyle <[email protected]> * Sdk new changes Signed-off-by: siri-varma <[email protected]> * Refine workflows Signed-off-by: siri-varma <[email protected]> * add ; Signed-off-by: Cassandra Coyle <[email protected]> * rm try Signed-off-by: Cassandra Coyle <[email protected]> --------- Signed-off-by: Cassandra Coyle <[email protected]> Signed-off-by: siri-varma <[email protected]> Co-authored-by: siri-varma <[email protected]>
1 parent ecc94f5 commit 7ed4d91

34 files changed

+165
-107
lines changed

dapr-spring/dapr-spring-workflows/src/main/java/io/dapr/spring/workflows/config/DaprWorkflowsConfiguration.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,9 @@ private void registerWorkflowsAndActivities(ApplicationContext applicationContex
4646
workflowRuntimeBuilder.registerActivity(activity);
4747
}
4848

49-
try (WorkflowRuntime runtime = workflowRuntimeBuilder.build()) {
50-
LOGGER.info("Starting workflow runtime ... ");
51-
runtime.start(false);
52-
}
49+
WorkflowRuntime runtime = workflowRuntimeBuilder.build();
50+
LOGGER.info("Starting workflow runtime ... ");
51+
runtime.start(false);
5352
}
5453

5554
@Override

examples/src/main/java/io/dapr/examples/unittesting/DaprWorkflowExampleTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313

1414
package io.dapr.examples.unittesting;
1515

16-
import com.microsoft.durabletask.Task;
17-
import com.microsoft.durabletask.TaskCanceledException;
16+
import io.dapr.durabletask.Task;
17+
import io.dapr.durabletask.TaskCanceledException;
1818
import io.dapr.workflows.Workflow;
1919
import io.dapr.workflows.WorkflowContext;
2020
import io.dapr.workflows.WorkflowStub;

examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainWorker.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,8 @@ public static void main(String[] args) throws Exception {
2929
builder.registerActivity(ToUpperCaseActivity.class);
3030

3131
// Build and then start the workflow runtime pulling and executing tasks
32-
try (WorkflowRuntime runtime = builder.build()) {
33-
System.out.println("Start workflow runtime");
34-
runtime.start();
35-
}
32+
WorkflowRuntime runtime = builder.build();
33+
System.out.println("Start workflow runtime");
34+
runtime.start();
3635
}
3736
}

examples/src/main/java/io/dapr/examples/workflows/childworkflow/DemoChildWorkflowWorker.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@ public static void main(String[] args) throws Exception {
3131
builder.registerActivity(ReverseActivity.class);
3232

3333
// Build and then start the workflow runtime pulling and executing tasks
34-
try (WorkflowRuntime runtime = builder.build()) {
35-
System.out.println("Start workflow runtime");
36-
runtime.start();
37-
}
34+
WorkflowRuntime runtime = builder.build();
35+
System.out.println("Start workflow runtime");
3836
}
3937
}

examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewWorker.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
import io.dapr.workflows.runtime.WorkflowRuntime;
1717
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
1818

19+
import java.util.concurrent.ExecutorService;
20+
import java.util.concurrent.Executors;
21+
1922
public class DemoContinueAsNewWorker {
2023
/**
2124
* The main method of this app.
@@ -25,13 +28,14 @@ public class DemoContinueAsNewWorker {
2528
*/
2629
public static void main(String[] args) throws Exception {
2730
// Register the Workflow with the builder.
28-
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoContinueAsNewWorkflow.class);
31+
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().
32+
registerWorkflow(DemoContinueAsNewWorkflow.class)
33+
.withExecutorService(Executors.newFixedThreadPool(3));
2934
builder.registerActivity(CleanUpActivity.class);
3035

3136
// Build and then start the workflow runtime pulling and executing tasks
32-
try (WorkflowRuntime runtime = builder.build()) {
33-
System.out.println("Start workflow runtime");
34-
runtime.start();
35-
}
37+
WorkflowRuntime runtime = builder.build();
38+
System.out.println("Start workflow runtime");
39+
runtime.start();
3640
}
3741
}

examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventWorker.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,8 @@ public static void main(String[] args) throws Exception {
3030
builder.registerActivity(DenyActivity.class);
3131

3232
// Build and then start the workflow runtime pulling and executing tasks
33-
try (WorkflowRuntime runtime = builder.build()) {
34-
System.out.println("Start workflow runtime");
35-
runtime.start();
36-
}
33+
WorkflowRuntime runtime = builder.build();
34+
System.out.println("Start workflow runtime");
35+
runtime.start();
3736
}
3837
}

examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorker.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,8 @@ public static void main(String[] args) throws Exception {
2929
builder.registerActivity(CountWordsActivity.class);
3030

3131
// Build and then start the workflow runtime pulling and executing tasks
32-
try (WorkflowRuntime runtime = builder.build()) {
33-
System.out.println("Start workflow runtime");
34-
runtime.start();
35-
}
32+
WorkflowRuntime runtime = builder.build();
33+
System.out.println("Start workflow runtime");
34+
runtime.start(false);
3635
}
3736
}

examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorkflow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package io.dapr.examples.workflows.faninout;
1515

16-
import com.microsoft.durabletask.Task;
16+
import io.dapr.durabletask.Task;
1717
import io.dapr.workflows.Workflow;
1818
import io.dapr.workflows.WorkflowStub;
1919

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
<!--
3737
manually declare durabletask-client's jackson dependencies for workflows sdk
3838
which conflict with dapr-sdk's jackson dependencies
39-
https://github.com/microsoft/durabletask-java/blob/main/client/build.gradle#L16
39+
https://github.com/dapr/durabletask-java/blob/main/client/build.gradle#L16
4040
-->
4141
<jackson.version>2.16.1</jackson.version>
4242
<gpg.skip>true</gpg.skip>

sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,9 @@ static void daprProperties(DynamicPropertyRegistry registry) {
9191
*/
9292
@BeforeEach
9393
public void init() {
94-
try (WorkflowRuntime runtime = workflowRuntimeBuilder.build()) {
95-
System.out.println("Start workflow runtime");
96-
runtime.start(false);
97-
}
94+
WorkflowRuntime runtime = workflowRuntimeBuilder.build();
95+
System.out.println("Start workflow runtime");
96+
runtime.start(false);
9897
}
9998

10099
@Test

sdk-workflows/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,14 @@
4545
<scope>test</scope>
4646
</dependency>
4747
<dependency>
48-
<groupId>com.microsoft</groupId>
48+
<groupId>io.dapr</groupId>
4949
<artifactId>durabletask-client</artifactId>
50-
<version>1.5.0</version>
50+
<version>1.5.2</version>
5151
</dependency>
5252
<!--
5353
manually declare durabletask-client's jackson dependencies
5454
which conflict with dapr-sdk's jackson dependencies
55-
https://github.com/microsoft/durabletask-java/blob/main/client/build.gradle#L16
55+
https://github.com/dapr/durabletask-java/blob/main/client/build.gradle#L16
5656
-->
5757
<dependency>
5858
<groupId>com.fasterxml.jackson.core</groupId>

sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313

1414
package io.dapr.workflows;
1515

16-
import com.microsoft.durabletask.CompositeTaskFailedException;
17-
import com.microsoft.durabletask.Task;
18-
import com.microsoft.durabletask.TaskCanceledException;
19-
import com.microsoft.durabletask.TaskFailedException;
16+
import io.dapr.durabletask.CompositeTaskFailedException;
17+
import io.dapr.durabletask.Task;
18+
import io.dapr.durabletask.TaskCanceledException;
19+
import io.dapr.durabletask.TaskFailedException;
2020
import org.slf4j.Logger;
2121

2222
import javax.annotation.Nullable;

sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313

1414
package io.dapr.workflows.client;
1515

16-
import com.microsoft.durabletask.DurableTaskClient;
17-
import com.microsoft.durabletask.DurableTaskGrpcClientBuilder;
18-
import com.microsoft.durabletask.NewOrchestrationInstanceOptions;
19-
import com.microsoft.durabletask.OrchestrationMetadata;
20-
import com.microsoft.durabletask.PurgeResult;
2116
import io.dapr.config.Properties;
17+
import io.dapr.durabletask.DurableTaskClient;
18+
import io.dapr.durabletask.DurableTaskGrpcClientBuilder;
19+
import io.dapr.durabletask.NewOrchestrationInstanceOptions;
20+
import io.dapr.durabletask.OrchestrationMetadata;
21+
import io.dapr.durabletask.PurgeResult;
2222
import io.dapr.utils.NetworkUtils;
2323
import io.dapr.workflows.Workflow;
2424
import io.dapr.workflows.internal.ApiTokenClientInterceptor;

sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package io.dapr.workflows.runtime;
1515

16-
import com.microsoft.durabletask.TaskActivityContext;
16+
import io.dapr.durabletask.TaskActivityContext;
1717
import io.dapr.workflows.WorkflowActivityContext;
1818

1919
/**

sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313

1414
package io.dapr.workflows.runtime;
1515

16-
import com.microsoft.durabletask.CompositeTaskFailedException;
17-
import com.microsoft.durabletask.RetryPolicy;
18-
import com.microsoft.durabletask.Task;
19-
import com.microsoft.durabletask.TaskCanceledException;
20-
import com.microsoft.durabletask.TaskOptions;
21-
import com.microsoft.durabletask.TaskOrchestrationContext;
16+
import io.dapr.durabletask.CompositeTaskFailedException;
17+
import io.dapr.durabletask.RetryPolicy;
18+
import io.dapr.durabletask.Task;
19+
import io.dapr.durabletask.TaskCanceledException;
20+
import io.dapr.durabletask.TaskOptions;
21+
import io.dapr.durabletask.TaskOrchestrationContext;
2222
import io.dapr.workflows.WorkflowContext;
2323
import io.dapr.workflows.WorkflowTaskOptions;
2424
import io.dapr.workflows.WorkflowTaskRetryPolicy;

sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowFailureDetails.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package io.dapr.workflows.runtime;
1515

16-
import com.microsoft.durabletask.FailureDetails;
16+
import io.dapr.durabletask.FailureDetails;
1717
import io.dapr.workflows.client.WorkflowFailureDetails;
1818

1919
/**

sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowInstanceStatus.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313

1414
package io.dapr.workflows.runtime;
1515

16-
import com.microsoft.durabletask.FailureDetails;
17-
import com.microsoft.durabletask.OrchestrationMetadata;
18-
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
16+
import io.dapr.durabletask.FailureDetails;
17+
import io.dapr.durabletask.OrchestrationMetadata;
18+
import io.dapr.durabletask.OrchestrationRuntimeStatus;
1919
import io.dapr.workflows.client.WorkflowFailureDetails;
2020
import io.dapr.workflows.client.WorkflowInstanceStatus;
2121
import io.dapr.workflows.client.WorkflowRuntimeStatus;

sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313

1414
package io.dapr.workflows.runtime;
1515

16-
import com.microsoft.durabletask.TaskActivity;
17-
import com.microsoft.durabletask.TaskActivityFactory;
16+
import io.dapr.durabletask.TaskActivity;
17+
import io.dapr.durabletask.TaskActivityFactory;
1818
import io.dapr.workflows.WorkflowActivity;
1919

2020
import java.lang.reflect.Constructor;

sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313

1414
package io.dapr.workflows.runtime;
1515

16-
import com.microsoft.durabletask.TaskActivity;
17-
import com.microsoft.durabletask.TaskActivityFactory;
16+
import io.dapr.durabletask.TaskActivity;
17+
import io.dapr.durabletask.TaskActivityFactory;
1818
import io.dapr.workflows.WorkflowActivity;
1919

2020
/**

sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313

1414
package io.dapr.workflows.runtime;
1515

16-
import com.microsoft.durabletask.TaskOrchestration;
17-
import com.microsoft.durabletask.TaskOrchestrationFactory;
16+
import io.dapr.durabletask.TaskOrchestration;
17+
import io.dapr.durabletask.TaskOrchestrationFactory;
1818
import io.dapr.workflows.Workflow;
1919

2020
import java.lang.reflect.Constructor;

sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313

1414
package io.dapr.workflows.runtime;
1515

16-
import com.microsoft.durabletask.TaskOrchestration;
17-
import com.microsoft.durabletask.TaskOrchestrationFactory;
16+
import io.dapr.durabletask.TaskOrchestration;
17+
import io.dapr.durabletask.TaskOrchestrationFactory;
1818
import io.dapr.workflows.Workflow;
1919

2020
/**

sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,34 @@
1313

1414
package io.dapr.workflows.runtime;
1515

16-
import com.microsoft.durabletask.DurableTaskGrpcWorker;
16+
import io.dapr.durabletask.DurableTaskGrpcWorker;
17+
import io.grpc.ManagedChannel;
18+
19+
import java.util.concurrent.ExecutorService;
20+
import java.util.concurrent.TimeUnit;
1721

1822
/**
1923
* Contains methods to register workflows and activities.
2024
*/
2125
public class WorkflowRuntime implements AutoCloseable {
2226

23-
private DurableTaskGrpcWorker worker;
27+
private final DurableTaskGrpcWorker worker;
28+
private final ManagedChannel managedChannel;
29+
private final ExecutorService executorService;
2430

25-
public WorkflowRuntime(DurableTaskGrpcWorker worker) {
31+
/**
32+
* Constructor.
33+
*
34+
* @param worker grpcWorker processing activities.
35+
* @param managedChannel grpc channel.
36+
* @param executorService executor service responsible for running the threads.
37+
*/
38+
public WorkflowRuntime(DurableTaskGrpcWorker worker,
39+
ManagedChannel managedChannel,
40+
ExecutorService executorService) {
2641
this.worker = worker;
42+
this.managedChannel = managedChannel;
43+
this.executorService = executorService;
2744
}
2845

2946
/**
@@ -50,11 +67,31 @@ public void start(boolean block) {
5067
/**
5168
* {@inheritDoc}
5269
*/
53-
@Override
5470
public void close() {
55-
if (this.worker != null) {
56-
this.worker.close();
57-
this.worker = null;
71+
this.shutDownWorkerPool();
72+
this.closeSideCarChannel();
73+
}
74+
75+
private void closeSideCarChannel() {
76+
this.managedChannel.shutdown();
77+
78+
try {
79+
if (!this.managedChannel.awaitTermination(60, TimeUnit.SECONDS)) {
80+
this.managedChannel.shutdownNow();
81+
}
82+
} catch (InterruptedException ex) {
83+
Thread.currentThread().interrupt();
84+
}
85+
}
86+
87+
private void shutDownWorkerPool() {
88+
this.executorService.shutdown();
89+
try {
90+
if (!this.executorService.awaitTermination(60, TimeUnit.SECONDS)) {
91+
this.executorService.shutdownNow();
92+
}
93+
} catch (InterruptedException ex) {
94+
Thread.currentThread().interrupt();
5895
}
5996
}
6097
}

0 commit comments

Comments
 (0)