Skip to content

Commit 5e9b33f

Browse files
authored
Cleanup retryer and retry options (#558)
1 parent f6cca51 commit 5e9b33f

20 files changed

+79
-311
lines changed

src/main/java/com/uber/cadence/client/WorkflowClientOptions.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
/*
2+
* Modifications Copyright (c) 2017-2020 Uber Technologies Inc.
3+
* Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
24
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
35
*
4-
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5-
*
66
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
77
* use this file except in compliance with the License. A copy of the License is
88
* located at

src/main/java/com/uber/cadence/internal/common/Retryer.java renamed to src/main/java/com/uber/cadence/internal/common/RpcRetryer.java

+19-9
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
/*
2-
* Modifications Copyright (c) 2017-2020 Uber Technologies Inc.
3-
* Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
42
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
53
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
66
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
77
* use this file except in compliance with the License. A copy of the License is
88
* located at
@@ -19,7 +19,13 @@
1919

2020
import static com.uber.cadence.internal.common.CheckedExceptionWrapper.unwrap;
2121

22-
import com.uber.cadence.*;
22+
import com.uber.cadence.BadRequestError;
23+
import com.uber.cadence.CancellationAlreadyRequestedError;
24+
import com.uber.cadence.DomainAlreadyExistsError;
25+
import com.uber.cadence.DomainNotActiveError;
26+
import com.uber.cadence.EntityNotExistsError;
27+
import com.uber.cadence.QueryFailedError;
28+
import com.uber.cadence.WorkflowExecutionAlreadyStartedError;
2329
import com.uber.cadence.common.RetryOptions;
2430
import java.time.Duration;
2531
import java.util.concurrent.CancellationException;
@@ -31,8 +37,8 @@
3137
import org.slf4j.Logger;
3238
import org.slf4j.LoggerFactory;
3339

34-
public final class Retryer {
35-
public static final RetryOptions DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS;
40+
public final class RpcRetryer {
41+
public static final RetryOptions DEFAULT_RPC_RETRY_OPTIONS;
3642

3743
private static final Duration RETRY_SERVICE_OPERATION_INITIAL_INTERVAL = Duration.ofMillis(20);
3844
private static final Duration RETRY_SERVICE_OPERATION_EXPIRATION_INTERVAL = Duration.ofMinutes(1);
@@ -58,7 +64,7 @@ public final class Retryer {
5864
QueryFailedError.class,
5965
DomainNotActiveError.class,
6066
CancellationAlreadyRequestedError.class);
61-
DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS = roBuilder.validateBuildWithDefaults();
67+
DEFAULT_RPC_RETRY_OPTIONS = roBuilder.validateBuildWithDefaults();
6268
}
6369

6470
public interface RetryableProc<E extends Throwable> {
@@ -81,7 +87,7 @@ private static class ValueExceptionPair<V> {
8187
private final CompletableFuture<V> value;
8288
private final Throwable exception;
8389

84-
public ValueExceptionPair(CompletableFuture<V> value, Throwable exception) {
90+
ValueExceptionPair(CompletableFuture<V> value, Throwable exception) {
8591
this.value = value;
8692
this.exception = exception;
8793
}
@@ -95,7 +101,7 @@ public Throwable getException() {
95101
}
96102
}
97103

98-
private static final Logger log = LoggerFactory.getLogger(Retryer.class);
104+
private static final Logger log = LoggerFactory.getLogger(RpcRetryer.class);
99105

100106
public static <T extends Throwable> void retry(RetryOptions options, RetryableProc<T> r)
101107
throws T {
@@ -107,6 +113,10 @@ public static <T extends Throwable> void retry(RetryOptions options, RetryablePr
107113
});
108114
}
109115

116+
public static <T extends Throwable> void retry(RetryableProc<T> r) throws T {
117+
retry(DEFAULT_RPC_RETRY_OPTIONS, r);
118+
}
119+
110120
public static <R, T extends Throwable> R retryWithResult(
111121
RetryOptions options, RetryableFunc<R, T> r) throws T {
112122
int attempt = 0;
@@ -266,5 +276,5 @@ private static <T extends Throwable> void rethrow(Exception e) throws T {
266276
}
267277

268278
/** Prohibits instantiation. */
269-
private Retryer() {}
279+
private RpcRetryer() {}
270280
}

src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java

+3-13
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.google.gson.JsonParser;
2828
import com.google.gson.JsonPrimitive;
2929
import com.uber.cadence.ActivityType;
30-
import com.uber.cadence.BadRequestError;
3130
import com.uber.cadence.Decision;
3231
import com.uber.cadence.DecisionType;
3332
import com.uber.cadence.DescribeWorkflowExecutionRequest;
@@ -91,15 +90,6 @@ public class WorkflowExecutionUtils {
9190
*/
9291
private static final String INDENTATION = " ";
9392

94-
private static RetryOptions retryParameters =
95-
new RetryOptions.Builder()
96-
.setBackoffCoefficient(2)
97-
.setInitialInterval(Duration.ofMillis(500))
98-
.setMaximumInterval(Duration.ofSeconds(30))
99-
.setMaximumAttempts(Integer.MAX_VALUE)
100-
.setDoNotRetry(BadRequestError.class, EntityNotExistsError.class)
101-
.build();
102-
10393
// Wait period for passive cluster to retry getting workflow result in case of replication delay.
10494
private static final long ENTITY_NOT_EXIST_RETRY_WAIT_MILLIS = 500;
10595

@@ -216,7 +206,7 @@ private static HistoryEvent getInstanceCloseEvent(
216206
RetryOptions retryOptions = getRetryOptionWithTimeout(timeout, unit);
217207
try {
218208
response =
219-
Retryer.retryWithResult(
209+
RpcRetryer.retryWithResult(
220210
retryOptions,
221211
() -> service.GetWorkflowExecutionHistoryWithTimeout(r, unit.toMillis(timeout)));
222212
} catch (EntityNotExistsError e) {
@@ -343,7 +333,7 @@ private static CompletableFuture<HistoryEvent> getInstanceCloseEventAsync(
343333
}
344334

345335
private static RetryOptions getRetryOptionWithTimeout(long timeout, TimeUnit unit) {
346-
return new RetryOptions.Builder(retryParameters)
336+
return new RetryOptions.Builder(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS)
347337
.setExpiration(Duration.ofSeconds(unit.toSeconds(timeout)))
348338
.build();
349339
}
@@ -355,7 +345,7 @@ private static RetryOptions getRetryOptionWithTimeout(long timeout, TimeUnit uni
355345
long timeout,
356346
TimeUnit unit) {
357347
RetryOptions retryOptions = getRetryOptionWithTimeout(timeout, unit);
358-
return Retryer.retryWithResultAsync(
348+
return RpcRetryer.retryWithResultAsync(
359349
retryOptions,
360350
() -> {
361351
CompletableFuture<GetWorkflowExecutionHistoryResponse> result = new CompletableFuture<>();

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java

+14-24
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,8 @@ private WorkflowExecution startWorkflowInternal(StartWorkflowExecutionParameters
118118
StartWorkflowExecutionResponse result;
119119
try {
120120
result =
121-
Retryer.retryWithResult(
122-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
123-
() -> service.StartWorkflowExecution(request));
121+
RpcRetryer.retryWithResult(
122+
RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, () -> service.StartWorkflowExecution(request));
124123
} catch (WorkflowExecutionAlreadyStartedError e) {
125124
throw e;
126125
} catch (TException e) {
@@ -137,7 +136,7 @@ private RetryOptions getRetryOptionsWithExpiration(RetryOptions o, Long timeoutI
137136
if (timeoutInMillis == null || timeoutInMillis <= 0 || timeoutInMillis == Long.MAX_VALUE) {
138137
return o;
139138
}
140-
return new RetryOptions.Builder(Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS)
139+
return new RetryOptions.Builder(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS)
141140
.setExpiration(Duration.ofMillis((timeoutInMillis)))
142141
.build();
143142
}
@@ -146,9 +145,8 @@ private CompletableFuture<WorkflowExecution> startWorkflowAsyncInternal(
146145
StartWorkflowExecutionParameters startParameters, Long timeoutInMillis) {
147146
StartWorkflowExecutionRequest request = getStartRequest(startParameters);
148147

149-
return Retryer.retryWithResultAsync(
150-
getRetryOptionsWithExpiration(
151-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, timeoutInMillis),
148+
return RpcRetryer.retryWithResultAsync(
149+
getRetryOptionsWithExpiration(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, timeoutInMillis),
152150
() -> {
153151
CompletableFuture<WorkflowExecution> result = new CompletableFuture<>();
154152
try {
@@ -272,9 +270,7 @@ public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParam
272270
SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters);
273271

274272
try {
275-
Retryer.retry(
276-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
277-
() -> service.SignalWorkflowExecution(request));
273+
RpcRetryer.retry(() -> service.SignalWorkflowExecution(request));
278274
} catch (TException e) {
279275
throw CheckedExceptionWrapper.wrap(e);
280276
}
@@ -290,9 +286,8 @@ public CompletableFuture<Void> signalWorkflowExecutionAsync(
290286
public CompletableFuture<Void> signalWorkflowExecutionAsync(
291287
SignalExternalWorkflowParameters signalParameters, Long timeoutInMillis) {
292288
SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters);
293-
return Retryer.retryWithResultAsync(
294-
getRetryOptionsWithExpiration(
295-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, timeoutInMillis),
289+
return RpcRetryer.retryWithResultAsync(
290+
getRetryOptionsWithExpiration(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, timeoutInMillis),
296291
() -> {
297292
CompletableFuture<Void> result = new CompletableFuture<>();
298293
try {
@@ -387,8 +382,8 @@ private WorkflowExecution signalWithStartWorkflowInternal(
387382
StartWorkflowExecutionResponse result;
388383
try {
389384
result =
390-
Retryer.retryWithResult(
391-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
385+
RpcRetryer.retryWithResult(
386+
RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS,
392387
() -> service.SignalWithStartWorkflowExecution(request));
393388
} catch (TException e) {
394389
throw CheckedExceptionWrapper.wrap(e);
@@ -405,9 +400,7 @@ public void requestCancelWorkflowExecution(WorkflowExecution execution) {
405400
request.setDomain(domain);
406401
request.setWorkflowExecution(execution);
407402
try {
408-
Retryer.retry(
409-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
410-
() -> service.RequestCancelWorkflowExecution(request));
403+
RpcRetryer.retry(() -> service.RequestCancelWorkflowExecution(request));
411404
} catch (TException e) {
412405
throw CheckedExceptionWrapper.wrap(e);
413406
}
@@ -427,9 +420,8 @@ public QueryWorkflowResponse queryWorkflow(QueryWorkflowParameters queryParamete
427420
request.setQueryRejectCondition(queryParameters.getQueryRejectCondition());
428421
try {
429422
QueryWorkflowResponse response =
430-
Retryer.retryWithResult(
431-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
432-
() -> service.QueryWorkflow(request));
423+
RpcRetryer.retryWithResult(
424+
RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, () -> service.QueryWorkflow(request));
433425
return response;
434426
} catch (TException e) {
435427
throw CheckedExceptionWrapper.wrap(e);
@@ -451,9 +443,7 @@ public void terminateWorkflowExecution(TerminateWorkflowExecutionParameters term
451443
request.setReason(terminateParameters.getReason());
452444
// request.setChildPolicy(terminateParameters.getChildPolicy());
453445
try {
454-
Retryer.retry(
455-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
456-
() -> service.TerminateWorkflowExecution(request));
446+
RpcRetryer.retry(() -> service.TerminateWorkflowExecution(request));
457447
} catch (TException e) {
458448
throw CheckedExceptionWrapper.wrap(e);
459449
}

src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java

+4-10
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import com.uber.cadence.client.ActivityCompletionFailureException;
3232
import com.uber.cadence.client.ActivityNotExistsException;
3333
import com.uber.cadence.converter.DataConverter;
34-
import com.uber.cadence.internal.common.Retryer;
34+
import com.uber.cadence.internal.common.RpcRetryer;
3535
import com.uber.cadence.internal.metrics.MetricsType;
3636
import com.uber.cadence.serviceclient.IWorkflowService;
3737
import com.uber.m3.tally.Scope;
@@ -95,9 +95,7 @@ public void complete(Object result) {
9595
request.setResult(convertedResult);
9696
request.setTaskToken(taskToken);
9797
try {
98-
Retryer.retry(
99-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
100-
() -> service.RespondActivityTaskCompleted(request));
98+
RpcRetryer.retry(() -> service.RespondActivityTaskCompleted(request));
10199
metricsScope.counter(MetricsType.ACTIVITY_TASK_COMPLETED_COUNTER).inc(1);
102100
} catch (EntityNotExistsError e) {
103101
throw new ActivityNotExistsException(e);
@@ -139,9 +137,7 @@ public void fail(Throwable failure) {
139137
request.setDetails(dataConverter.toData(failure));
140138
request.setTaskToken(taskToken);
141139
try {
142-
Retryer.retry(
143-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
144-
() -> service.RespondActivityTaskFailed(request));
140+
RpcRetryer.retry(() -> service.RespondActivityTaskFailed(request));
145141
metricsScope.counter(MetricsType.ACTIVITY_TASK_FAILED_COUNTER).inc(1);
146142
} catch (EntityNotExistsError e) {
147143
throw new ActivityNotExistsException(e);
@@ -156,9 +152,7 @@ public void fail(Throwable failure) {
156152
request.setWorkflowID(execution.getWorkflowId());
157153
request.setRunID(execution.getRunId());
158154
try {
159-
Retryer.retry(
160-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
161-
() -> service.RespondActivityTaskFailedByID(request));
155+
RpcRetryer.retry(() -> service.RespondActivityTaskFailedByID(request));
162156
metricsScope.counter(MetricsType.ACTIVITY_TASK_FAILED_BY_ID_COUNTER).inc(1);
163157
} catch (EntityNotExistsError e) {
164158
throw new ActivityNotExistsException(e);

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import com.uber.cadence.WorkflowType;
3333
import com.uber.cadence.common.RetryOptions;
3434
import com.uber.cadence.internal.common.OptionsUtils;
35-
import com.uber.cadence.internal.common.Retryer;
35+
import com.uber.cadence.internal.common.RpcRetryer;
3636
import com.uber.cadence.internal.metrics.MetricsTag;
3737
import com.uber.cadence.internal.metrics.MetricsType;
3838
import com.uber.cadence.internal.replay.HistoryHelper.DecisionEvents;
@@ -667,7 +667,7 @@ public HistoryEvent next() {
667667

668668
try {
669669
GetWorkflowExecutionHistoryResponse r =
670-
Retryer.retryWithResult(
670+
RpcRetryer.retryWithResult(
671671
retryOptions, () -> service.GetWorkflowExecutionHistory(request));
672672
current = r.getHistory().getEventsIterator();
673673
nextPageToken = r.getNextPageToken();

src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public DecisionTaskHandler.Result handleDecisionTask(PollForDecisionTaskResponse
113113
e.printStackTrace(pw);
114114
String stackTrace = sw.toString();
115115
failedRequest.setDetails(stackTrace.getBytes(StandardCharsets.UTF_8));
116-
return new DecisionTaskHandler.Result(null, failedRequest, null, null);
116+
return new DecisionTaskHandler.Result(null, failedRequest, null);
117117
}
118118
}
119119

@@ -236,7 +236,7 @@ private Result processQuery(PollForDecisionTaskResponse decisionTask) {
236236
cache.markProcessingDone(decisionTask);
237237
}
238238
}
239-
return new Result(null, null, queryCompletedRequest, null);
239+
return new Result(null, null, queryCompletedRequest);
240240
}
241241

242242
private Result createCompletedRequest(
@@ -254,7 +254,7 @@ private Result createCompletedRequest(
254254
(int) stickyTaskListScheduleToStartTimeout.getSeconds());
255255
completedRequest.setStickyAttributes(attributes);
256256
}
257-
return new Result(completedRequest, null, null, null);
257+
return new Result(completedRequest, null, null);
258258
}
259259

260260
@Override

src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,7 @@ private ActivityTaskHandler.Result mapToActivityFailure(
147147
failure = CheckedExceptionWrapper.unwrap(failure);
148148
result.setReason(failure.getClass().getName());
149149
result.setDetails(dataConverter.toData(failure));
150-
return new ActivityTaskHandler.Result(
151-
null, new Result.TaskFailedResult(result, failure), null, null);
150+
return new ActivityTaskHandler.Result(null, new Result.TaskFailedResult(result, failure), null);
152151
}
153152

154153
@Override
@@ -214,12 +213,12 @@ public ActivityTaskHandler.Result execute(ActivityTaskImpl task, Scope metricsSc
214213
Object result = method.invoke(activity, args);
215214
RespondActivityTaskCompletedRequest request = new RespondActivityTaskCompletedRequest();
216215
if (context.isDoNotCompleteOnReturn()) {
217-
return new ActivityTaskHandler.Result(null, null, null, null);
216+
return new ActivityTaskHandler.Result(null, null, null);
218217
}
219218
if (method.getReturnType() != Void.TYPE) {
220219
request.setResult(dataConverter.toData(result));
221220
}
222-
return new ActivityTaskHandler.Result(request, null, null, null);
221+
return new ActivityTaskHandler.Result(request, null, null);
223222
} catch (RuntimeException | IllegalAccessException e) {
224223
return mapToActivityFailure(e, metricsScope, false);
225224
} catch (InvocationTargetException e) {
@@ -252,7 +251,7 @@ public ActivityTaskHandler.Result execute(ActivityTaskImpl task, Scope metricsSc
252251
if (method.getReturnType() != Void.TYPE) {
253252
request.setResult(dataConverter.toData(result));
254253
}
255-
return new ActivityTaskHandler.Result(request, null, null, null);
254+
return new ActivityTaskHandler.Result(request, null, null);
256255
} catch (RuntimeException | IllegalAccessException e) {
257256
return mapToActivityFailure(e, metricsScope, true);
258257
} catch (InvocationTargetException e) {

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

-3
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,6 @@ public Worker newWorker(
139139
WorkerOptions.Builder builder =
140140
WorkerOptions.newBuilder()
141141
.setInterceptorFactory(testEnvironmentOptions.getInterceptorFactory());
142-
if (testEnvironmentOptions.getDataConverter() != null) {
143-
builder.setDataConverter(testEnvironmentOptions.getDataConverter());
144-
}
145142
builder = overrideOptions.apply(builder);
146143
return workerFactory.newWorker(taskList, builder.build());
147144
}

0 commit comments

Comments
 (0)