Skip to content

Commit 4804c73

Browse files
Merge pull request #4447 from KalinduGandara/issue_4399
Fix Message processor duplication when coordination DB delay happen
2 parents f35a37b + e595d77 commit 4804c73

File tree

1 file changed

+13
-0
lines changed

1 file changed

+13
-0
lines changed

components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/impl/AbstractQuartzTaskManager.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
import org.apache.commons.logging.Log;
2121
import org.apache.commons.logging.LogFactory;
22+
import org.apache.synapse.commons.util.MiscellaneousUtil;
23+
import org.apache.synapse.core.SynapseEnvironment;
24+
import org.apache.synapse.message.processor.MessageProcessor;
2225
import org.quartz.CronScheduleBuilder;
2326
import org.quartz.Job;
2427
import org.quartz.JobBuilder;
@@ -37,6 +40,7 @@
3740
import org.quartz.TriggerListener;
3841
import org.quartz.impl.matchers.GroupMatcher;
3942
import org.quartz.spi.OperableTrigger;
43+
import org.wso2.micro.integrator.core.util.MicroIntegratorBaseUtils;
4044
import org.wso2.micro.integrator.ntask.common.TaskConstants;
4145
import org.wso2.micro.integrator.ntask.common.TaskException;
4246
import org.wso2.micro.integrator.ntask.coordination.TaskCoordinationException;
@@ -179,6 +183,15 @@ protected synchronized void pauseLocalTaskTemporarily(String taskName) throws Ta
179183
if (null != listener) {
180184
listener.notifyLocalTaskPause(taskName);
181185
}
186+
if (MiscellaneousUtil.isTaskOfMessageProcessor(taskName)) {
187+
SynapseEnvironment synapseEnvironment = MicroIntegratorBaseUtils.getSynapseEnvironment();
188+
String messageProcessorName = MiscellaneousUtil.getMessageProcessorName(taskName);
189+
MessageProcessor messageProcessor = synapseEnvironment.getSynapseConfiguration()
190+
.getMessageProcessors().get(messageProcessorName);
191+
if (messageProcessor != null) {
192+
messageProcessor.pauseMessageProcessorTemporarily();
193+
}
194+
}
182195
log.info("Task temporarily paused: [" + this.getTaskType() + "][" + taskName + "]");
183196
} catch (SchedulerException e) {
184197
throw new TaskException("Error in temporarily pausing task with name: " + taskName,

0 commit comments

Comments
 (0)