Skip to content

Commit 0ac68cd

Browse files
Merge pull request #4430 from sajinieKavindya/inbound-deactivate-feature
Refactor inbound endpoint deactivation logic
2 parents d02c733 + f00033b commit 0ac68cd

File tree

5 files changed

+145
-38
lines changed

5 files changed

+145
-38
lines changed

components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundOneTimeTriggerRequestProcessor.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.synapse.startup.quartz.StartUpController;
2727
import org.apache.synapse.task.TaskDescription;
2828
import org.apache.synapse.task.TaskManager;
29+
import org.wso2.carbon.inbound.endpoint.protocol.generic.GenericOneTimeTask;
2930
import org.wso2.carbon.inbound.endpoint.protocol.rabbitmq.RabbitMQTask;
3031
import org.wso2.micro.integrator.mediation.ntask.NTaskTaskManager;
3132

@@ -74,12 +75,17 @@ protected void start(OneTimeTriggerInboundTask task, String endpointPostfix) {
7475
startUpController.setTaskDescription(taskDescription);
7576
startUpController.init(synapseEnvironment);
7677
// registering a listener to identify task removal or deletions.
78+
TaskManager taskManagerImpl = synapseEnvironment.getTaskManager().getTaskManagerImpl();
7779
if (task instanceof RabbitMQTask) {
78-
TaskManager taskManagerImpl = synapseEnvironment.getTaskManager().getTaskManagerImpl();
7980
if (taskManagerImpl instanceof NTaskTaskManager) {
8081
((NTaskTaskManager) taskManagerImpl).registerListener((RabbitMQTask) task,
8182
taskDescription.getName());
8283
}
84+
} else if (task instanceof GenericOneTimeTask) {
85+
if (taskManagerImpl instanceof NTaskTaskManager) {
86+
((NTaskTaskManager) taskManagerImpl).registerListener((GenericOneTimeTask) task,
87+
taskDescription.getName());
88+
}
8389
}
8490
} catch (Exception e) {
8591
log.error("Error starting the inbound endpoint " + name + ". Unable to schedule the task. " + e

components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/Utils.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.commons.logging.Log;
2323
import org.apache.commons.logging.LogFactory;
2424

25+
import java.lang.reflect.Method;
26+
import java.lang.reflect.Modifier;
2527
import java.util.concurrent.atomic.AtomicInteger;
2628

2729
public class Utils {
@@ -65,4 +67,40 @@ public static void waitForGracefulTaskCompletion(GracefulShutdownTimer gracefulS
6567
}
6668
}
6769
}
70+
71+
/**
72+
* Checks whether the specified method is implemented (i.e., not abstract) in the given class.
73+
*
74+
* <p>This utility method attempts to locate a method with the provided name and parameter types
75+
* in the given class. If the method exists and is not declared as {@code abstract}, it is
76+
* considered implemented and the method returns {@code true}. Otherwise, it returns {@code false}.
77+
* </p>
78+
*
79+
* <p>This check can be useful for verifying whether a custom class provides concrete
80+
* implementations for required interface or superclass methods before invoking them,
81+
* avoiding runtime {@link UnsupportedOperationException}s.</p>
82+
*
83+
* @param clazz the {@link Class} object to inspect
84+
* @param methodName the name of the method to check
85+
* @param paramTypes the parameter types of the method (if any)
86+
* @return {@code true} if the method exists and is not abstract, {@code false} otherwise
87+
*/
88+
public static boolean checkMethodImplementation(Class<?> clazz, String methodName, Class<?>... paramTypes) {
89+
if (log.isDebugEnabled()) {
90+
log.debug("Checking method implementation for: " + methodName + " in class: " + clazz.getName());
91+
}
92+
try {
93+
Method method = clazz.getDeclaredMethod(methodName, paramTypes);
94+
boolean isImplemented = !Modifier.isAbstract(method.getModifiers());
95+
if (log.isDebugEnabled()) {
96+
log.debug("Method " + methodName + " implementation status: " + isImplemented);
97+
}
98+
return isImplemented;
99+
} catch (NoSuchMethodException e) {
100+
if (log.isDebugEnabled()) {
101+
log.debug("Method " + methodName + " not found in class " + clazz.getName(), e);
102+
}
103+
return false;
104+
}
105+
}
68106
}

components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericEventBasedListener.java

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.synapse.task.TaskStartupObserver;
2828
import org.wso2.carbon.inbound.endpoint.common.InboundOneTimeTriggerEventBasedProcessor;
2929
import org.wso2.carbon.inbound.endpoint.protocol.PollingConstants;
30+
import org.wso2.carbon.inbound.endpoint.protocol.Utils;
3031

3132
import java.lang.reflect.Constructor;
3233
import java.lang.reflect.InvocationTargetException;
@@ -150,36 +151,42 @@ public void update() {
150151
}
151152

152153
public boolean activate() {
153-
try {
154-
eventConsumer.resume();
155-
} catch (AbstractMethodError e) {
156-
throw new UnsupportedOperationException("Unsupported operation 'resume()' for Inbound Endpoint: " + getName() +
157-
"If using a WSO2-released inbound, please upgrade to the latest version. " +
158-
"If this is a custom inbound, implement the 'resume' logic accordingly.");
154+
if (Utils.checkMethodImplementation(eventConsumer.getClass(), "resume")) {
155+
return super.activate();
156+
} else {
157+
throw new UnsupportedOperationException("Deactivation is not supported for Inbound Endpoint '" + getName()
158+
+ "'. To enable this functionality, ensure that the 'destroy()' and 'resume()' methods are "
159+
+ "properly implemented. If using a WSO2-released inbound, please upgrade to the latest version.");
159160
}
160-
return super.activate();
161161
}
162162

163163
@Override
164164
public boolean deactivate() {
165-
boolean isTaskDeactivated = super.deactivate();
165+
if (Utils.checkMethodImplementation(eventConsumer.getClass(), "destroy")
166+
&& Utils.checkMethodImplementation(eventConsumer.getClass(), "resume")) {
167+
boolean isTaskDeactivated = super.deactivate();
166168

167-
if (isTaskDeactivated) {
168-
try {
169+
if (isTaskDeactivated) {
169170
eventConsumer.destroy();
170-
} catch (AbstractMethodError e) {
171-
throw new UnsupportedOperationException("Unsupported operation 'pause()' for Inbound Endpoint: " + getName() +
172-
"If using a WSO2-released inbound, please upgrade to the latest version. " +
173-
"If this is a custom inbound, implement the 'pause' logic accordingly.");
171+
return true;
174172
}
173+
} else {
174+
throw new UnsupportedOperationException("Deactivation is not supported for Inbound Endpoint '" + getName()
175+
+ "'. To enable this functionality, ensure that the 'destroy()' and 'resume()' methods are "
176+
+ "properly implemented. If using a WSO2-released inbound, please upgrade to the latest version.");
175177
}
176-
177-
return isTaskDeactivated;
178+
return false;
178179
}
179180

180181
@Override
181182
public void pause() {
182-
183-
eventConsumer.pause();
183+
try {
184+
eventConsumer.pause();
185+
} catch (AbstractMethodError e) {
186+
if (log.isDebugEnabled()) {
187+
log.debug("Implement the 'pause()' method to enable graceful shutdown in your custom "
188+
+ "inbound endpoint: " + getName());
189+
}
190+
}
184191
}
185192
}

components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericOneTimeTask.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@
2121
import org.apache.commons.logging.LogFactory;
2222
import org.apache.synapse.core.SynapseEnvironment;
2323
import org.wso2.carbon.inbound.endpoint.common.OneTimeTriggerInboundTask;
24+
import org.wso2.micro.integrator.ntask.core.impl.LocalTaskActionListener;
2425

25-
public class GenericOneTimeTask extends OneTimeTriggerInboundTask {
26+
public class GenericOneTimeTask extends OneTimeTriggerInboundTask implements LocalTaskActionListener {
2627

2728
private static final Log logger = LogFactory.getLog(GenericOneTimeTask.class.getName());
2829
private GenericEventBasedConsumer eventBasedConsumer;
@@ -49,4 +50,45 @@ public GenericEventBasedConsumer getEventBasedConsumer() {
4950
return eventBasedConsumer;
5051
}
5152

53+
/**
54+
* Method to notify when a local task is removed, it can be due to pause or delete.
55+
* Destroys the Generic task upon removal of the local task.
56+
*
57+
* @param taskName the name of the task that was deleted
58+
*/
59+
@Override
60+
public void notifyLocalTaskRemoval(String taskName) {
61+
logger.info("Removing Generic One Time task: " + taskName);
62+
try {
63+
eventBasedConsumer.destroy();
64+
} catch (AbstractMethodError e) {
65+
logger.warn("Task [" + taskName + "] : Unsupported operation 'destroy()' for Inbound Endpoint: "
66+
+ getName() + ". If using a WSO2-released inbound, please upgrade to the latest version. "
67+
+ "If this is a custom inbound, implement the 'destroy' logic accordingly.");
68+
}
69+
}
70+
71+
@Override
72+
public void notifyLocalTaskPause(String taskName) {
73+
logger.info("Pausing Generic One Time task: " + taskName);
74+
try {
75+
eventBasedConsumer.destroy();
76+
} catch (AbstractMethodError e) {
77+
logger.warn("Task [" + taskName + "] : Unsupported operation 'destroy()' for Inbound Endpoint: "
78+
+ getName() + ". If using a WSO2-released inbound, please upgrade to the latest version. "
79+
+ "If this is a custom inbound, implement the 'destroy' logic accordingly.");
80+
}
81+
}
82+
83+
@Override
84+
public void notifyLocalTaskResume(String taskName) {
85+
logger.info("Resuming Generic One Time task: " + taskName);
86+
try {
87+
eventBasedConsumer.resume();
88+
} catch (AbstractMethodError e) {
89+
logger.warn("Unsupported operation 'resume()' for Inbound Endpoint: " + getName() +
90+
". If using a WSO2-released inbound, please upgrade to the latest version. " +
91+
"If this is a custom inbound, implement the 'resume' logic accordingly.");
92+
}
93+
}
5294
}

components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericProcessor.java

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.wso2.carbon.inbound.endpoint.common.InboundRequestProcessorImpl;
3030
import org.wso2.carbon.inbound.endpoint.common.InboundTask;
3131
import org.wso2.carbon.inbound.endpoint.protocol.PollingConstants;
32+
import org.wso2.carbon.inbound.endpoint.protocol.Utils;
3233

3334
import java.lang.reflect.Constructor;
3435
import java.lang.reflect.InvocationTargetException;
@@ -198,7 +199,14 @@ public void destroy() {
198199

199200
@Override
200201
public void pause() {
201-
202+
try {
203+
pollingConsumer.pause();
204+
} catch (AbstractMethodError e) {
205+
if (log.isDebugEnabled()) {
206+
log.debug("Implement the 'pause()' method to enable graceful shutdown in your custom "
207+
+ "inbound endpoint: " + getName());
208+
}
209+
}
202210
}
203211

204212
public String getName() {
@@ -215,29 +223,35 @@ public void update() {
215223

216224
@Override
217225
public boolean activate() {
218-
try {
219-
pollingConsumer.resume();
220-
} catch (AbstractMethodError e) {
221-
throw new UnsupportedOperationException("Unsupported operation 'resume()' for Inbound Endpoint: " + getName() +
222-
"If using a WSO2-released inbound, please upgrade to the latest version. " +
223-
"If this is a custom inbound, implement the 'resume' logic accordingly.");
226+
227+
if (Utils.checkMethodImplementation(pollingConsumer.getClass(), "resume")) {
228+
// After the task is resumed via super.activate(), the resume() method of the corresponding polling
229+
// consumer (where the task is scheduled) will be invoked within the 'GenericTask.notifyLocalTaskResume' method.
230+
231+
return super.activate();
232+
} else {
233+
throw new UnsupportedOperationException("Activation is not supported for Inbound Endpoint '" + getName()
234+
+ "'. To enable this functionality, ensure that the 'destroy()' and 'resume()' methods are "
235+
+ "properly implemented. If using a WSO2-released inbound, please upgrade to the latest version.");
224236
}
225-
return super.activate();
226237
}
227238

228239
@Override
229240
public boolean deactivate() {
230-
boolean isTaskDeactivated = super.deactivate();
231241

232-
if (isTaskDeactivated) {
233-
try {
234-
pollingConsumer.destroy();
235-
} catch (AbstractMethodError e) {
236-
throw new UnsupportedOperationException("Unsupported operation 'destroy()' for Inbound Endpoint: "
237-
+ getName() + "If using a WSO2-released inbound, please upgrade to the latest version. "
238-
+ "If this is a custom inbound, implement the 'destroy' logic accordingly.");
239-
}
242+
if (Utils.checkMethodImplementation(pollingConsumer.getClass(), "destroy")
243+
&& Utils.checkMethodImplementation(pollingConsumer.getClass(), "resume")) {
244+
// We check that both 'destroy' and 'resume' are implemented to ensure that existing customers who only
245+
// implemented 'destroy' do not end up in an inconsistent state due to a missing 'resume' implementation.
246+
247+
// After the task is paused via super.deactivate(), the destroy() method of the corresponding polling
248+
// consumer (where the task is scheduled) will be invoked within the 'GenericTask.notifyLocalTaskPause' method.
249+
250+
return super.deactivate();
251+
} else {
252+
throw new UnsupportedOperationException("Deactivation is not supported for Inbound Endpoint '"
253+
+ getName() + "'. To enable this functionality, ensure that the 'destroy()' and 'resume()' methods "
254+
+ "are properly implemented. If using a WSO2-released inbound, please upgrade to the latest version.");
240255
}
241-
return isTaskDeactivated;
242256
}
243257
}

0 commit comments

Comments
 (0)