Skip to content

Commit c01299d

Browse files
author
Siri Varma Vegiraju
committed
fix things
Signed-off-by: Siri Varma Vegiraju <s_vegiraju@apple.com>
1 parent 9c494d5 commit c01299d

17 files changed

Lines changed: 223 additions & 516 deletions

File tree

examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorkflow.java

Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,35 +16,44 @@
1616
import io.dapr.durabletask.TaskFailedException;
1717
import io.dapr.workflows.Workflow;
1818
import io.dapr.workflows.WorkflowStub;
19+
import io.dapr.workflows.WorkflowTaskOptions;
20+
import io.dapr.workflows.WorkflowTaskRetryPolicy;
21+
22+
import java.util.List;
23+
import java.util.ArrayList;
24+
import java.util.Collections;
25+
import java.time.Duration;
1926

2027
public class BookTripWorkflow implements Workflow {
2128
@Override
2229
public WorkflowStub create() {
2330
return ctx -> {
2431
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
25-
CompensationHelper compensationHelper = new CompensationHelper();
32+
List<String> compensations = new ArrayList<>();
33+
34+
// Define retry policy for compensation activities
35+
WorkflowTaskRetryPolicy compensationRetryPolicy = WorkflowTaskRetryPolicy.newBuilder()
36+
.setFirstRetryInterval(Duration.ofSeconds(1))
37+
.setMaxNumberOfAttempts(3)
38+
.build();
39+
40+
WorkflowTaskOptions compensationOptions = new WorkflowTaskOptions(compensationRetryPolicy);
2641

2742
try {
2843
// Book flight
29-
String flightResult = ctx.callActivity(
30-
BookFlightActivity.class.getName(), null, String.class).await();
44+
String flightResult = ctx.callActivity(BookFlightActivity.class.getName(), null, String.class).await();
3145
ctx.getLogger().info("Flight booking completed: {}", flightResult);
32-
compensationHelper.addCompensation("CancelFlight", () ->
33-
ctx.callActivity(CancelFlightActivity.class.getName(), null, String.class).await());
46+
compensations.add("CancelFlight");
3447

3548
// Book hotel
36-
String hotelResult = ctx.callActivity(
37-
BookHotelActivity.class.getName(), null, String.class).await();
49+
String hotelResult = ctx.callActivity(BookHotelActivity.class.getName(), null, String.class).await();
3850
ctx.getLogger().info("Hotel booking completed: {}", hotelResult);
39-
compensationHelper.addCompensation("CancelHotel", () ->
40-
ctx.callActivity(CancelHotelActivity.class.getName(), null, String.class).await());
51+
compensations.add("CancelHotel");
4152

4253
// Book car
43-
String carResult = ctx.callActivity(
44-
BookCarActivity.class.getName(), null, String.class).await();
54+
String carResult = ctx.callActivity(BookCarActivity.class.getName(), null, String.class).await();
4555
ctx.getLogger().info("Car booking completed: {}", carResult);
46-
compensationHelper.addCompensation("CancelCar", () ->
47-
ctx.callActivity(CancelCarActivity.class.getName(), null, String.class).await());
56+
compensations.add("CancelCar");
4857

4958
String result = String.format("%s, %s, %s", flightResult, hotelResult, carResult);
5059
ctx.getLogger().info("Trip booked successfully: {}", result);
@@ -53,7 +62,44 @@ public WorkflowStub create() {
5362
} catch (TaskFailedException e) {
5463
ctx.getLogger().info("******** executing compensation logic ********");
5564
ctx.getLogger().error("Activity failed: {}", e.getMessage());
56-
compensationHelper.compensate();
65+
66+
// Execute compensations in reverse order
67+
Collections.reverse(compensations);
68+
for (String compensation : compensations) {
69+
try {
70+
switch (compensation) {
71+
case "CancelCar":
72+
String carCancelResult = ctx.callActivity(
73+
CancelCarActivity.class.getName(),
74+
null,
75+
compensationOptions,
76+
String.class).await();
77+
ctx.getLogger().info("Car cancellation completed: {}", carCancelResult);
78+
break;
79+
80+
case "CancelHotel":
81+
String hotelCancelResult = ctx.callActivity(
82+
CancelHotelActivity.class.getName(),
83+
null,
84+
compensationOptions,
85+
String.class).await();
86+
ctx.getLogger().info("Hotel cancellation completed: {}", hotelCancelResult);
87+
break;
88+
89+
case "CancelFlight":
90+
String flightCancelResult = ctx.callActivity(
91+
CancelFlightActivity.class.getName(),
92+
null,
93+
compensationOptions,
94+
String.class).await();
95+
ctx.getLogger().info("Flight cancellation completed: {}", flightCancelResult);
96+
break;
97+
}
98+
} catch (TaskFailedException ex) {
99+
// Only catch TaskFailedException for actual activity failures
100+
ctx.getLogger().error("Activity failed during compensation: {}", ex.getMessage());
101+
}
102+
}
57103
ctx.complete("Workflow failed, compensation applied");
58104
}
59105
};

examples/src/main/java/io/dapr/examples/workflows/compensation/CompensationHelper.java

Lines changed: 0 additions & 37 deletions
This file was deleted.

sdk-tests/src/test/java/io/dapr/it/DaprRun.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,13 @@ public void stop() throws InterruptedException, IOException {
172172
System.out.println("Stopping dapr application ...");
173173
try {
174174
this.stopCommand.run();
175-
176175
System.out.println("Dapr application stopped.");
177176
} catch (RuntimeException e) {
178-
System.out.println("Could not stop app " + this.appName + ": " + e.getMessage());
177+
if (e.getMessage() != null && e.getMessage().contains("Could not find success criteria")) {
178+
System.out.println("App " + this.appName + " already stopped or not found (ignored).");
179+
} else {
180+
System.out.println("Could not stop app " + this.appName + ": " + e.getMessage());
181+
}
179182
}
180183
}
181184

@@ -219,8 +222,7 @@ public void waitForAppHealth(int maxWaitMilliseconds) throws InterruptedExceptio
219222
while (System.currentTimeMillis() <= maxWait) {
220223
try {
221224
stub.healthCheck(Empty.getDefaultInstance());
222-
// artursouza: workaround due to race condition with runtime's probe on app's health.
223-
Thread.sleep(5000);
225+
Thread.sleep(2000);
224226
return;
225227
} catch (Exception e) {
226228
Thread.sleep(1000);
@@ -232,29 +234,31 @@ public void waitForAppHealth(int maxWaitMilliseconds) throws InterruptedExceptio
232234
channel.shutdown();
233235
}
234236
} else {
235-
Duration waitDuration = Duration.ofMillis(maxWaitMilliseconds);
237+
long maxWait = System.currentTimeMillis() + maxWaitMilliseconds;
236238
HttpClient client = HttpClient.newBuilder()
237239
.version(HttpClient.Version.HTTP_1_1)
238-
.connectTimeout(waitDuration)
240+
.connectTimeout(Duration.ofSeconds(5))
239241
.build();
240242
String url = "http://127.0.0.1:" + this.getAppPort() + "/health";
241243
HttpRequest request = HttpRequest.newBuilder()
242244
.GET()
243245
.uri(URI.create(url))
244246
.build();
245247

246-
try {
247-
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
248-
249-
if (response.statusCode() != 200) {
250-
throw new RuntimeException("error: HTTP service is not healthy.");
248+
while (System.currentTimeMillis() <= maxWait) {
249+
try {
250+
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
251+
if (response.statusCode() == 200) {
252+
Thread.sleep(2000);
253+
return;
254+
}
255+
} catch (IOException e) {
256+
// not ready yet
251257
}
252-
} catch (IOException e) {
253-
throw new RuntimeException("exception: HTTP service is not healthy.");
258+
Thread.sleep(1000);
254259
}
255260

256-
// artursouza: workaround due to race condition with runtime's probe on app's health.
257-
Thread.sleep(5000);
261+
throw new RuntimeException("timeout: HTTP service is not healthy.");
258262
}
259263
}
260264

sdk-tests/src/test/java/io/dapr/it/actors/ActorReminderRecoveryIT.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -129,24 +129,19 @@ public void reminderRecoveryTest(
129129
) throws Exception {
130130
setup(actorType);
131131

132-
logger.debug("Pausing 3 seconds to let gRPC connection get ready");
133-
Thread.sleep(3000);
134-
135132
logger.debug("Invoking actor method 'startReminder' which will register a reminder");
136133
proxy.invokeMethod("setReminderData", reminderDataParam).block();
137134

138135
proxy.invokeMethod("startReminder", reminderName).block();
139136

140-
logger.debug("Pausing 7 seconds to allow reminder to fire");
141-
Thread.sleep(7000);
142-
137+
logger.debug("Waiting for reminder to fire at least 3 times");
143138
final List<MethodEntryTracker> logs = new ArrayList<>();
144139
callWithRetry(() -> {
145140
logs.clear();
146141
logs.addAll(fetchMethodCallLogs(proxy));
147142
validateMethodCalls(logs, METHOD_NAME, 3);
148143
validateMessageContent(logs, METHOD_NAME, expectedReminderStateText);
149-
}, 5000);
144+
}, 30000);
150145

151146
// Restarts runtime only.
152147
logger.info("Stopping Dapr sidecar");

sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ public void timerRecoveryTest() throws Exception {
5454
true,
5555
60000);
5656

57-
Thread.sleep(3000);
5857
String actorType="MyActorTest";
5958
logger.debug("Creating proxy builder");
6059

@@ -68,16 +67,14 @@ public void timerRecoveryTest() throws Exception {
6867
logger.debug("Invoking actor method 'startTimer' which will register a timer");
6968
proxy.invokeMethod("startTimer", "myTimer").block();
7069

71-
logger.debug("Pausing 7 seconds to allow timer to fire");
72-
Thread.sleep(7000);
73-
70+
logger.debug("Waiting for timer to fire at least 3 times");
7471
final List<MethodEntryTracker> logs = new ArrayList<>();
7572
callWithRetry(() -> {
7673
logs.clear();
7774
logs.addAll(fetchMethodCallLogs(proxy));
7875
validateMethodCalls(logs, METHOD_NAME, 3);
7976
validateMessageContent(logs, METHOD_NAME, "ping!");
80-
}, 5000);
77+
}, 30000);
8178

8279
// Restarts app only.
8380
runs.left.stop();
@@ -91,7 +88,7 @@ public void timerRecoveryTest() throws Exception {
9188
newLogs.clear();
9289
newLogs.addAll(fetchMethodCallLogs(proxy));
9390
validateMethodCalls(newLogs, METHOD_NAME, 3);
94-
}, 10000);
91+
}, 30000);
9592

9693
// Check that the restart actually happened by confirming the old logs are not in the new logs.
9794
for (MethodEntryTracker oldLog: logs) {

0 commit comments

Comments
 (0)