Skip to content

Commit 6060866

Browse files
mfateevLiang Mei
authored and
Liang Mei
committed
Added @ActivityInterface implementation to POJOActivityTaskHandler
1 parent 7b4b3d2 commit 6060866

12 files changed

+366
-51
lines changed

src/main/java/com/uber/cadence/internal/common/InternalUtils.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
/*
2+
* Modifications Copyright (c) 2017-2020 Uber Technologies Inc.
3+
* Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
24
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
35
*
4-
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5-
*
66
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
77
* use this file except in compliance with the License. A copy of the License is
88
* located at
@@ -51,7 +51,11 @@ public final class InternalUtils {
5151
* @return "Simple class name"::"methodName"
5252
*/
5353
public static String getSimpleName(Method method) {
54-
return method.getDeclaringClass().getSimpleName() + "::" + method.getName();
54+
return getSimpleName(method.getDeclaringClass(), method);
55+
}
56+
57+
public static String getSimpleName(Class<?> type, Method method) {
58+
return type.getSimpleName() + "::" + method.getName();
5559
}
5660

5761
public static String getWorkflowType(Method method, WorkflowMethod workflowMethod) {

src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java

+131-24
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
/*
2+
* Modifications Copyright (c) 2017-2020 Uber Technologies Inc.
3+
* Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
24
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
35
*
4-
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5-
*
66
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
77
* use this file except in compliance with the License. A copy of the License is
88
* located at
@@ -18,10 +18,11 @@
1818
package com.uber.cadence.internal.sync;
1919

2020
import com.google.common.base.Joiner;
21-
import com.google.common.reflect.TypeToken;
21+
import com.google.common.base.Objects;
2222
import com.uber.cadence.PollForActivityTaskResponse;
2323
import com.uber.cadence.RespondActivityTaskCompletedRequest;
2424
import com.uber.cadence.RespondActivityTaskFailedRequest;
25+
import com.uber.cadence.activity.ActivityInterface;
2526
import com.uber.cadence.activity.ActivityMethod;
2627
import com.uber.cadence.client.ActivityCancelledException;
2728
import com.uber.cadence.common.MethodRetry;
@@ -33,11 +34,14 @@
3334
import com.uber.cadence.serviceclient.IWorkflowService;
3435
import com.uber.cadence.testing.SimulatedTimeoutException;
3536
import com.uber.m3.tally.Scope;
37+
import java.lang.annotation.Annotation;
3638
import java.lang.reflect.InvocationTargetException;
3739
import java.lang.reflect.Method;
3840
import java.util.Collections;
3941
import java.util.HashMap;
42+
import java.util.HashSet;
4043
import java.util.Map;
44+
import java.util.Set;
4145
import java.util.concurrent.CancellationException;
4246
import java.util.concurrent.ScheduledExecutorService;
4347
import java.util.function.BiFunction;
@@ -82,30 +86,29 @@ private void addActivityImplementation(
8286
+ "\" This annotation can be used only on the interface method it implements.");
8387
}
8488
}
85-
TypeToken<?>.TypeSet interfaces = TypeToken.of(cls).getTypes().interfaces();
86-
if (interfaces.isEmpty()) {
87-
throw new IllegalArgumentException("Activity must implement at least one interface");
89+
Set<MethodInterfacePair> activityMethods =
90+
getAnnotatedInterfaceMethods(cls, ActivityInterface.class);
91+
if (activityMethods.isEmpty()) {
92+
throw new IllegalArgumentException(
93+
"Class doesn't implement any non empty interface annotated with @ActivityInterface: "
94+
+ cls.getName());
8895
}
89-
for (TypeToken<?> i : interfaces) {
90-
if (i.getType().getTypeName().startsWith("org.mockito")) {
91-
continue;
96+
for (MethodInterfacePair pair : activityMethods) {
97+
Method method = pair.getMethod();
98+
ActivityMethod annotation = method.getAnnotation(ActivityMethod.class);
99+
String activityType;
100+
if (annotation != null && !annotation.name().isEmpty()) {
101+
activityType = annotation.name();
102+
} else {
103+
activityType = InternalUtils.getSimpleName(pair.getType(), method);
92104
}
93-
for (Method method : i.getRawType().getMethods()) {
94-
ActivityMethod annotation = method.getAnnotation(ActivityMethod.class);
95-
String activityType;
96-
if (annotation != null && !annotation.name().isEmpty()) {
97-
activityType = annotation.name();
98-
} else {
99-
activityType = InternalUtils.getSimpleName(method);
100-
}
101-
if (activities.containsKey(activityType)) {
102-
throw new IllegalStateException(
103-
activityType + " activity type is already registered with the worker");
104-
}
105-
106-
ActivityTaskExecutor implementation = newTaskExecutor.apply(method, activity);
107-
activities.put(activityType, implementation);
105+
if (activities.containsKey(activityType)) {
106+
throw new IllegalStateException(
107+
activityType + " activity type is already registered with the worker");
108108
}
109+
110+
ActivityTaskExecutor implementation = newTaskExecutor.apply(method, activity);
111+
activities.put(activityType, implementation);
109112
}
110113
}
111114

@@ -266,4 +269,108 @@ public ActivityTaskHandler.Result execute(ActivityTaskImpl task, Scope metricsSc
266269
void setWorkflowService(IWorkflowService service) {
267270
this.service = service;
268271
}
272+
273+
static class MethodInterfacePair {
274+
private final Method method;
275+
private final Class<?> type;
276+
277+
MethodInterfacePair(Method method, Class<?> type) {
278+
this.method = method;
279+
this.type = type;
280+
}
281+
282+
public Method getMethod() {
283+
return method;
284+
}
285+
286+
public Class<?> getType() {
287+
return type;
288+
}
289+
290+
@Override
291+
public boolean equals(Object o) {
292+
if (this == o) return true;
293+
if (o == null || getClass() != o.getClass()) return false;
294+
MethodInterfacePair that = (MethodInterfacePair) o;
295+
return Objects.equal(method, that.method) && Objects.equal(type, that.type);
296+
}
297+
298+
@Override
299+
public int hashCode() {
300+
return Objects.hashCode(method, type);
301+
}
302+
303+
@Override
304+
public String toString() {
305+
return "MethodInterfacePair{" + "method=" + method + ", type=" + type + '}';
306+
}
307+
}
308+
309+
/** Used to override equals and hashCode of Method to ensure deduping by method name in a set. */
310+
static class MethodWrapper {
311+
private final Method method;
312+
313+
MethodWrapper(Method method) {
314+
this.method = method;
315+
}
316+
317+
public Method getMethod() {
318+
return method;
319+
}
320+
321+
@Override
322+
public boolean equals(Object o) {
323+
if (this == o) return true;
324+
if (o == null || getClass() != o.getClass()) return false;
325+
MethodWrapper that = (MethodWrapper) o;
326+
return Objects.equal(method.getName(), that.method.getName());
327+
}
328+
329+
@Override
330+
public int hashCode() {
331+
return Objects.hashCode(method.getName());
332+
}
333+
}
334+
335+
Set<MethodInterfacePair> getAnnotatedInterfaceMethods(
336+
Class<?> implementationClass, Class<? extends Annotation> annotationClass) {
337+
if (implementationClass.isInterface()) {
338+
throw new IllegalArgumentException(
339+
"Concrete class expected. Found interface: " + implementationClass.getSimpleName());
340+
}
341+
Set<MethodInterfacePair> pairs = new HashSet<>();
342+
// Methods inherited from interfaces that are not annotated with @ActivityInterface
343+
Set<MethodWrapper> ignored = new HashSet<>();
344+
getAnnotatedInterfaceMethods(implementationClass, annotationClass, ignored, pairs);
345+
return pairs;
346+
}
347+
348+
private void getAnnotatedInterfaceMethods(
349+
Class<?> current,
350+
Class<? extends Annotation> annotationClass,
351+
Set<MethodWrapper> methods,
352+
Set<MethodInterfacePair> result) {
353+
// Using set to dedupe methods which are defined in both non activity parent and current
354+
Set<MethodWrapper> ourMethods = new HashSet<>();
355+
if (current.isInterface()) {
356+
Method[] declaredMethods = current.getDeclaredMethods();
357+
for (int i = 0; i < declaredMethods.length; i++) {
358+
Method declaredMethod = declaredMethods[i];
359+
ourMethods.add(new MethodWrapper(declaredMethod));
360+
}
361+
}
362+
Class<?>[] interfaces = current.getInterfaces();
363+
for (int i = 0; i < interfaces.length; i++) {
364+
Class<?> anInterface = interfaces[i];
365+
getAnnotatedInterfaceMethods(anInterface, annotationClass, ourMethods, result);
366+
}
367+
Annotation annotation = current.getAnnotation(annotationClass);
368+
if (annotation == null) {
369+
methods.addAll(ourMethods);
370+
return;
371+
}
372+
for (MethodWrapper method : ourMethods) {
373+
result.add(new MethodInterfacePair(method.getMethod(), current));
374+
}
375+
}
269376
}

src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ public String foo() {
155155
}
156156
}
157157

158+
@ActivityInterface
158159
public interface LocalActivityContextPropagation {
159160

160161
@ActivityMethod

src/test/java/com/uber/cadence/internal/testing/ActivityTestingTest.java

+3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import com.uber.cadence.RecordActivityTaskHeartbeatResponse;
2828
import com.uber.cadence.activity.Activity;
29+
import com.uber.cadence.activity.ActivityInterface;
2930
import com.uber.cadence.client.ActivityCancelledException;
3031
import com.uber.cadence.serviceclient.IWorkflowService;
3132
import com.uber.cadence.testing.TestActivityEnvironment;
@@ -47,6 +48,7 @@ public void setUp() {
4748
testEnvironment = TestActivityEnvironment.newInstance();
4849
}
4950

51+
@ActivityInterface
5052
public interface TestActivity {
5153

5254
String activity1(String input);
@@ -110,6 +112,7 @@ public void testHeartbeat() {
110112
assertEquals("details1", details.get());
111113
}
112114

115+
@ActivityInterface
113116
public interface InterruptibleTestActivity {
114117
void activity1() throws InterruptedException;
115118
}

src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java

+3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.uber.cadence.WorkflowExecution;
3737
import com.uber.cadence.WorkflowExecutionInfo;
3838
import com.uber.cadence.activity.Activity;
39+
import com.uber.cadence.activity.ActivityInterface;
3940
import com.uber.cadence.activity.ActivityOptions;
4041
import com.uber.cadence.client.*;
4142
import com.uber.cadence.context.ContextPropagator;
@@ -161,6 +162,7 @@ public void testFailure() {
161162
}
162163
}
163164

165+
@ActivityInterface
164166
public interface TestActivity {
165167
String activity1(String input);
166168
}
@@ -486,6 +488,7 @@ public void testConcurrentDecision() throws ExecutionException, InterruptedExcep
486488
log.info(testEnvironment.getDiagnostics());
487489
}
488490

491+
@ActivityInterface
489492
public interface TestCancellationActivity {
490493
String activity1(String input);
491494
}

0 commit comments

Comments
 (0)