Skip to content

Commit 68767ae

Browse files
mfateevLiang Mei
authored and
Liang Mei
committed
Added support for calling polymorphic activities
1 parent acb797f commit 68767ae

File tree

8 files changed

+135
-38
lines changed

8 files changed

+135
-38
lines changed

src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandler.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,15 @@ class ActivityInvocationHandler extends ActivityInvocationHandlerBase {
3030
private final WorkflowInterceptor activityExecutor;
3131

3232
static InvocationHandler newInstance(
33-
ActivityOptions options, WorkflowInterceptor activityExecutor) {
34-
return new ActivityInvocationHandler(options, activityExecutor);
33+
Class<?> activityInterface, ActivityOptions options, WorkflowInterceptor activityExecutor) {
34+
return new ActivityInvocationHandler(activityInterface, activityExecutor, options);
3535
}
3636

37-
private ActivityInvocationHandler(ActivityOptions options, WorkflowInterceptor activityExecutor) {
37+
private ActivityInvocationHandler(
38+
Class<?> activityInterface, WorkflowInterceptor activityExecutor, ActivityOptions options) {
3839
this.options = options;
3940
this.activityExecutor = activityExecutor;
41+
init(activityInterface);
4042
}
4143

4244
@Override

src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandlerBase.java

+28-24
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,19 @@
1818
package com.uber.cadence.internal.sync;
1919

2020
import static com.uber.cadence.internal.common.InternalUtils.getValueOrDefault;
21+
import static com.uber.cadence.internal.sync.POJOActivityTaskHandler.getAnnotatedInterfaceMethodsFromInterface;
2122

23+
import com.uber.cadence.activity.ActivityInterface;
2224
import com.uber.cadence.activity.ActivityMethod;
2325
import com.uber.cadence.common.MethodRetry;
2426
import com.uber.cadence.internal.common.InternalUtils;
2527
import com.uber.cadence.internal.sync.AsyncInternal.AsyncMarker;
26-
import com.uber.cadence.workflow.Workflow;
2728
import java.lang.reflect.InvocationHandler;
2829
import java.lang.reflect.Method;
2930
import java.lang.reflect.Proxy;
3031
import java.util.HashMap;
3132
import java.util.Map;
33+
import java.util.Set;
3234
import java.util.function.Function;
3335

3436
/** Dynamic implementation of a strongly typed child workflow interface. */
@@ -45,33 +47,35 @@ static <T> T newProxy(Class<T> activityInterface, InvocationHandler invocationHa
4547
invocationHandler);
4648
}
4749

50+
protected void init(Class<?> activityInterface) {
51+
Set<POJOActivityTaskHandler.MethodInterfacePair> activityMethods =
52+
getAnnotatedInterfaceMethodsFromInterface(activityInterface, ActivityInterface.class);
53+
if (activityMethods.isEmpty()) {
54+
throw new IllegalArgumentException(
55+
"Class doesn't implement any non empty interface annotated with @ActivityInterface: "
56+
+ activityInterface.getName());
57+
}
58+
for (POJOActivityTaskHandler.MethodInterfacePair pair : activityMethods) {
59+
Method method = pair.getMethod();
60+
ActivityMethod activityMethod = method.getAnnotation(ActivityMethod.class);
61+
String activityType;
62+
if (activityMethod != null && !activityMethod.name().isEmpty()) {
63+
activityType = activityMethod.name();
64+
} else {
65+
activityType = InternalUtils.getSimpleName(pair.getType(), method);
66+
}
67+
68+
MethodRetry methodRetry = method.getAnnotation(MethodRetry.class);
69+
Function<Object[], Object> function = getActivityFunc(method, methodRetry, activityType);
70+
methodFunctions.put(method, function);
71+
}
72+
}
73+
4874
@Override
4975
public Object invoke(Object proxy, Method method, Object[] args) {
5076
Function<Object[], Object> function = methodFunctions.get(method);
5177
if (function == null) {
52-
try {
53-
if (method.equals(Object.class.getMethod("toString"))) {
54-
// TODO: activity info
55-
return "ActivityInvocationHandlerBase";
56-
}
57-
if (!method.getDeclaringClass().isInterface()) {
58-
throw new IllegalArgumentException(
59-
"Interface type is expected: " + method.getDeclaringClass());
60-
}
61-
MethodRetry methodRetry = method.getAnnotation(MethodRetry.class);
62-
ActivityMethod activityMethod = method.getAnnotation(ActivityMethod.class);
63-
String activityName;
64-
if (activityMethod == null || activityMethod.name().isEmpty()) {
65-
activityName = InternalUtils.getSimpleName(method);
66-
} else {
67-
activityName = activityMethod.name();
68-
}
69-
70-
function = getActivityFunc(method, methodRetry, activityName);
71-
methodFunctions.put(method, function);
72-
} catch (NoSuchMethodException e) {
73-
throw Workflow.wrap(e);
74-
}
78+
throw new IllegalArgumentException("Unexpected method: " + method);
7579
}
7680
return getValueOrDefault(function.apply(args), method.getReturnType());
7781
}

src/main/java/com/uber/cadence/internal/sync/LocalActivityInvocationHandler.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,19 @@ class LocalActivityInvocationHandler extends ActivityInvocationHandlerBase {
3030
private final WorkflowInterceptor activityExecutor;
3131

3232
static InvocationHandler newInstance(
33-
LocalActivityOptions options, WorkflowInterceptor activityExecutor) {
34-
return new LocalActivityInvocationHandler(options, activityExecutor);
33+
Class<?> activityInterface,
34+
LocalActivityOptions options,
35+
WorkflowInterceptor activityExecutor) {
36+
return new LocalActivityInvocationHandler(activityInterface, activityExecutor, options);
3537
}
3638

3739
private LocalActivityInvocationHandler(
38-
LocalActivityOptions options, WorkflowInterceptor activityExecutor) {
40+
Class<?> activityInterface,
41+
WorkflowInterceptor activityExecutor,
42+
LocalActivityOptions options) {
3943
this.options = options;
4044
this.activityExecutor = activityExecutor;
45+
init(activityInterface);
4146
}
4247

4348
@Override

src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java

+28-5
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ private void addActivityImplementation(
8888
}
8989
}
9090
Set<MethodInterfacePair> activityMethods =
91-
getAnnotatedInterfaceMethods(cls, ActivityInterface.class);
91+
getAnnotatedInterfaceMethodsFromImplementation(cls, ActivityInterface.class);
9292
if (activityMethods.isEmpty()) {
9393
throw new IllegalArgumentException(
9494
"Class doesn't implement any non empty interface annotated with @ActivityInterface: "
@@ -338,7 +338,7 @@ public int hashCode() {
338338
}
339339
}
340340

341-
Set<MethodInterfacePair> getAnnotatedInterfaceMethods(
341+
static Set<MethodInterfacePair> getAnnotatedInterfaceMethodsFromImplementation(
342342
Class<?> implementationClass, Class<? extends Annotation> annotationClass) {
343343
if (implementationClass.isInterface()) {
344344
throw new IllegalArgumentException(
@@ -347,11 +347,33 @@ Set<MethodInterfacePair> getAnnotatedInterfaceMethods(
347347
Set<MethodInterfacePair> pairs = new HashSet<>();
348348
// Methods inherited from interfaces that are not annotated with @ActivityInterface
349349
Set<MethodWrapper> ignored = new HashSet<>();
350-
getAnnotatedInterfaceMethods(implementationClass, annotationClass, ignored, pairs);
350+
getAnnotatedInterfaceMethodsFromImplementation(
351+
implementationClass, annotationClass, ignored, pairs);
351352
return pairs;
352353
}
353354

354-
private void getAnnotatedInterfaceMethods(
355+
static Set<MethodInterfacePair> getAnnotatedInterfaceMethodsFromInterface(
356+
Class<?> iClass, Class<? extends Annotation> annotationClass) {
357+
if (!iClass.isInterface()) {
358+
throw new IllegalArgumentException("Interface expected. Found: " + iClass.getSimpleName());
359+
}
360+
Annotation annotation = iClass.getAnnotation(annotationClass);
361+
if (annotation == null) {
362+
throw new IllegalArgumentException(
363+
"@ActivityInterface annotation is required on the stub interface: "
364+
+ iClass.getSimpleName());
365+
}
366+
Set<MethodInterfacePair> pairs = new HashSet<>();
367+
// Methods inherited from interfaces that are not annotated with @ActivityInterface
368+
Set<MethodWrapper> ignored = new HashSet<>();
369+
getAnnotatedInterfaceMethodsFromImplementation(iClass, annotationClass, ignored, pairs);
370+
if (!ignored.isEmpty()) {
371+
throw new IllegalStateException("Not empty ignored: " + ignored);
372+
}
373+
return pairs;
374+
}
375+
376+
private static void getAnnotatedInterfaceMethodsFromImplementation(
355377
Class<?> current,
356378
Class<? extends Annotation> annotationClass,
357379
Set<MethodWrapper> methods,
@@ -368,7 +390,8 @@ private void getAnnotatedInterfaceMethods(
368390
Class<?>[] interfaces = current.getInterfaces();
369391
for (int i = 0; i < interfaces.length; i++) {
370392
Class<?> anInterface = interfaces[i];
371-
getAnnotatedInterfaceMethods(anInterface, annotationClass, ourMethods, result);
393+
getAnnotatedInterfaceMethodsFromImplementation(
394+
anInterface, annotationClass, ourMethods, result);
372395
}
373396
Annotation annotation = current.getAnnotation(annotationClass);
374397
if (annotation == null) {

src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ public <T> T newActivityStub(Class<T> activityInterface) {
177177
.setHeartbeatTimeout(Duration.ofSeconds(1))
178178
.build();
179179
InvocationHandler invocationHandler =
180-
ActivityInvocationHandler.newInstance(options, new TestActivityExecutor(workflowService));
180+
ActivityInvocationHandler.newInstance(
181+
activityInterface, options, new TestActivityExecutor(workflowService));
181182
invocationHandler = new DeterministicRunnerWrapper(invocationHandler);
182183
return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
183184
}

src/main/java/com/uber/cadence/internal/sync/WorkflowInternal.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public static long currentTimeMillis() {
149149
public static <T> T newActivityStub(Class<T> activityInterface, ActivityOptions options) {
150150
WorkflowInterceptor decisionContext = WorkflowInternal.getWorkflowInterceptor();
151151
InvocationHandler invocationHandler =
152-
ActivityInvocationHandler.newInstance(options, decisionContext);
152+
ActivityInvocationHandler.newInstance(activityInterface, options, decisionContext);
153153
return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
154154
}
155155

@@ -162,7 +162,7 @@ public static <T> T newLocalActivityStub(
162162
Class<T> activityInterface, LocalActivityOptions options) {
163163
WorkflowInterceptor decisionContext = WorkflowInternal.getWorkflowInterceptor();
164164
InvocationHandler invocationHandler =
165-
LocalActivityInvocationHandler.newInstance(options, decisionContext);
165+
LocalActivityInvocationHandler.newInstance(activityInterface, options, decisionContext);
166166
return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
167167
}
168168

src/test/java/com/uber/cadence/internal/testing/ActivityTestingTest.java

+61
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import com.uber.cadence.workflow.ActivityFailureException;
3434
import io.netty.util.internal.ConcurrentSet;
3535
import java.io.IOException;
36+
import java.util.ArrayList;
37+
import java.util.List;
3638
import java.util.concurrent.atomic.AtomicInteger;
3739
import java.util.concurrent.atomic.AtomicReference;
3840
import org.apache.thrift.TException;
@@ -252,4 +254,63 @@ public void testHeartbeatIntermittentError() throws TException, InterruptedExcep
252254
activity.activity1();
253255
assertEquals(3, count.get());
254256
}
257+
258+
public interface A {
259+
void a();
260+
}
261+
262+
@ActivityInterface
263+
public interface B extends A {
264+
void b();
265+
}
266+
267+
@ActivityInterface
268+
public interface C extends B, A {
269+
void c();
270+
}
271+
272+
public class CImpl implements C {
273+
private List<String> invocations = new ArrayList<>();
274+
275+
@Override
276+
public void a() {
277+
invocations.add("a");
278+
}
279+
280+
@Override
281+
public void b() {
282+
invocations.add("b");
283+
}
284+
285+
@Override
286+
public void c() {
287+
invocations.add("c");
288+
}
289+
}
290+
291+
@Test
292+
public void testInvokingActivityByBaseInterface() {
293+
CImpl impl = new CImpl();
294+
testEnvironment.registerActivitiesImplementations(impl);
295+
try {
296+
testEnvironment.newActivityStub(A.class);
297+
fail("A doesn't implement activity");
298+
} catch (IllegalArgumentException e) {
299+
// expected as A doesn't implement any activity
300+
}
301+
B b = testEnvironment.newActivityStub(B.class);
302+
b.a();
303+
b.b();
304+
C c = testEnvironment.newActivityStub(C.class);
305+
c.a();
306+
c.b();
307+
c.c();
308+
List<String> expected = new ArrayList<>();
309+
expected.add("a");
310+
expected.add("b");
311+
expected.add("a");
312+
expected.add("b");
313+
expected.add("c");
314+
assertEquals(expected, impl.invocations);
315+
}
255316
}

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -5441,6 +5441,7 @@ public interface GreetingWorkflow {
54415441
void createGreeting(String name);
54425442
}
54435443

5444+
@ActivityInterface
54445445
public interface GreetingActivities {
54455446
@ActivityMethod
54465447
String composeGreeting(String string);

0 commit comments

Comments
 (0)