Skip to content

Commit

Permalink
Merge pull request #2326 from SanojPunchihewa/bug-fixes
Browse files Browse the repository at this point in the history
Fix scatter gather issue when used with template mediators
  • Loading branch information
SanojPunchihewa authored Feb 11, 2025
2 parents 6fb13f7 + 7463be9 commit 2082f20
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class LogMediatorFactory extends AbstractMediatorFactory {
protected static final QName ELEMENT_MESSAGE_Q
= new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "message");
private static final QName ATT_LOG_FULL_PAYLOAD = new QName("logFullPayload");
private static final QName ATT_LOG_MESSAGE_ID = new QName("logMessageID");

public QName getTagQName() {
return LOG_Q;
Expand Down Expand Up @@ -110,6 +111,11 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
}
}

OMAttribute logMessageIDAttr = elem.getAttribute(ATT_LOG_MESSAGE_ID);
if (logMessageIDAttr != null && Boolean.parseBoolean(logMessageIDAttr.getAttributeValue())) {
logMediator.setLogMessageID(true);
}

// Set the log statement category (i.e. INFO, DEBUG, etc..)
OMAttribute category = elem.getAttribute(ATT_CATEGORY);
if (category != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public OMElement serializeSpecificMediator(Mediator m) {
OMElement messageElement = fac.createOMElement("message", synNS);
messageElement.setText(mediator.getMessageTemplate());
log.addChild(messageElement);
log.addAttribute(fac.createOMAttribute("logMessageID", nullNS,
Boolean.toString(mediator.isLogMessageID())));
break;
}
if (StringUtils.isNotBlank(logLevel)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class LogMediator extends AbstractMediator {

private String messageTemplate = "";
private boolean isContentAware = false;
private boolean logMessageID = false;
private final Map<String, SynapseExpression> inlineExpressionCache = new ConcurrentHashMap<>();

/**
Expand Down Expand Up @@ -169,6 +170,18 @@ private String getLogMessage(MessageContext synCtx) {

private String getCustomLogMessage(MessageContext synCtx) {
StringBuffer sb = new StringBuffer();
if (logMessageID) {
if (synCtx.getMessageID() != null) {
sb.append("MessageID: ").append(synCtx.getMessageID());
}
if (getCorrelationId(synCtx) != null) {
sb.append(separator).append("correlation_id: ").append(getCorrelationId(synCtx));
}
// append separator if message id is logged
if (sb.length() > 0) {
sb.append(separator);
}
}
processMessageTemplate(sb, synCtx, messageTemplate);
setCustomProperties(sb, synCtx);
return trimLeadingSeparator(sb);
Expand Down Expand Up @@ -330,6 +343,16 @@ public void setMessageTemplate(String messageTemplate) {
this.messageTemplate = messageTemplate.replace("\\n", "\n").replace("\\t", "\t");
}

public boolean isLogMessageID() {

return logMessageID;
}

public void setLogMessageID(boolean logMessageID) {

this.logMessageID = logMessageID;
}

private String trimLeadingSeparator(StringBuffer sb) {
String retStr = sb.toString();
if (retStr.startsWith(separator)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,18 +258,23 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat
synLog.traceOrDebug("Foreach mediator : Mediating from ContinuationState");
}

boolean result;
boolean result = false;
boolean readyToAggregate = false;
SequenceMediator branchSequence = target.getSequence();
boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled();
// If there are no children and the continuation was triggered from a mediator worker start aggregation
// otherwise mediate through the sub branch sequence
if (!continuationState.hasChild()) {
if (Utils.isContinuationTriggeredFromMediatorWorker(synCtx)) {
synLog.traceOrDebug("Continuation is triggered from a mediator worker");
result = true;
synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, false);
readyToAggregate = true;
} else {
synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the sub branch sequence");
result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1);
if (result) {
readyToAggregate = true;
}
}
} else {
synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the child continuation state");
Expand All @@ -285,10 +290,10 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat
if (continueWithoutAggregation) {
return false;
}
if (result) {
if (readyToAggregate) {
return aggregateMessages(synCtx, synLog);
}
return false;
return result;
}

private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) {
Expand Down Expand Up @@ -424,7 +429,7 @@ private boolean completeAggregate(ForEachAggregate aggregate) {
ContinuationStackManager.updateSeqContinuationState(originalMessageContext, getMediatorPosition());

getLog(originalMessageContext).traceOrDebug("End : Foreach mediator");
boolean result = false;
boolean result;

// Set CONTINUE_STATISTICS_FLOW to avoid mark event collection as finished before the aggregation is completed
originalMessageContext.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true);
Expand All @@ -446,7 +451,7 @@ private boolean completeAggregate(ForEachAggregate aggregate) {
}
} while (result && !originalMessageContext.getContinuationStateStack().isEmpty());
CloseEventCollector.closeEventsAfterScatterGather(originalMessageContext);
return result;
return false;
} else {
handleException(aggregate, "Error retrieving the original message context", null, aggregate.getLastMessage());
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat
synLog.traceOrDebug("Scatter Gather mediator : Mediating from ContinuationState");
}

boolean result;
boolean result = false;
boolean readyToAggregate = false;
int subBranch = ((ReliantContinuationState) continuationState).getSubBranch();

SequenceMediator branchSequence = targets.get(subBranch).getSequence();
Expand All @@ -277,10 +278,14 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat
if (!continuationState.hasChild()) {
if (Utils.isContinuationTriggeredFromMediatorWorker(synCtx)) {
synLog.traceOrDebug("Continuation is triggered from a mediator worker");
result = true;
synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, false);
readyToAggregate = true;
} else {
synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the sub branch sequence");
result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1);
if (result) {
readyToAggregate = true;
}
}
} else {
synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the child continuation state");
Expand All @@ -292,10 +297,10 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat
((Mediator) mediator).reportCloseStatistics(synCtx, null);
}
}
if (result) {
if (readyToAggregate) {
return aggregateMessages(synCtx, synLog);
}
return false;
return result;
}

private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) {
Expand Down Expand Up @@ -480,10 +485,9 @@ private boolean processAggregation(MessageContext messageContext, Aggregate aggr
if (Utils.isTargetBody(resultTarget)) {
// Set content type to the aggregated message
setContentType(messageContext);
} else {
// Update the continuation state to current mediator position as we are using the original message context
ContinuationStackManager.updateSeqContinuationState(messageContext, getMediatorPosition());
}
// Update the continuation state to current mediator position
ContinuationStackManager.updateSeqContinuationState(messageContext, getMediatorPosition());
messageContext.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true);

if (RuntimeStatisticCollector.isStatisticsEnabled()) {
Expand All @@ -492,7 +496,7 @@ private boolean processAggregation(MessageContext messageContext, Aggregate aggr
}

getLog(messageContext).traceOrDebug("End : Scatter Gather mediator");
boolean result = false;
boolean result;
do {
SeqContinuationState seqContinuationState =
(SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(messageContext);
Expand All @@ -507,7 +511,7 @@ private boolean processAggregation(MessageContext messageContext, Aggregate aggr
}
} while (result && !messageContext.getContinuationStateStack().isEmpty());
CloseEventCollector.closeEventsAfterScatterGather(messageContext);
return result;
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testForEachMediatorV2_continueWithoutAggregation() throws Exception
String inputXML = "<foreach collection=\"${payload.array}\" parallel-execution=\"true\" " +
"continue-without-aggregation=\"true\" xmlns=\"http://ws.apache.org/ns/synapse\">" +
"<sequence>" +
"<log>" +
"<log logMessageID=\"false\">" +
"<message>Processing payload ${payload}</message>" +
"</log>" +
"</sequence>" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,18 @@ private String getXmlOfMediatorScenarioOne(String level) {
}

private String getXmlOfLogMediatorWithTemplate() {
return "<log xmlns=\"http://ws.apache.org/ns/synapse\">" +
return "<log xmlns=\"http://ws.apache.org/ns/synapse\" logMessageID=\"false\">" +
"<message>Processing message with ID: 123</message></log>";
}

private String getXmlOfLogMediatorWithTemplateAndProps() {
return "<log xmlns=\"http://ws.apache.org/ns/synapse\">" +
return "<log xmlns=\"http://ws.apache.org/ns/synapse\" logMessageID=\"false\">" +
"<message>Processing message with ID: 123</message>" +
"<property name=\"Text\" value=\"Sending quote request\"/></log>";
}

private String getXmlOfLogMediatorWithEmptyTemplateAndProps() {
return "<log xmlns=\"http://ws.apache.org/ns/synapse\">" +
return "<log xmlns=\"http://ws.apache.org/ns/synapse\" logMessageID=\"false\">" +
"<message></message>" +
"<property name=\"Text\" value=\"Sending quote request\"/></log>";
}
Expand Down

0 comments on commit 2082f20

Please sign in to comment.