Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scatter gather issue when used with template mediators #2326

Merged
merged 3 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class LogMediatorFactory extends AbstractMediatorFactory {
protected static final QName ELEMENT_MESSAGE_Q
= new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "message");
private static final QName ATT_LOG_FULL_PAYLOAD = new QName("logFullPayload");
private static final QName ATT_LOG_MESSAGE_ID = new QName("logMessageID");

public QName getTagQName() {
return LOG_Q;
Expand Down Expand Up @@ -110,6 +111,11 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
}
}

OMAttribute logMessageIDAttr = elem.getAttribute(ATT_LOG_MESSAGE_ID);
if (logMessageIDAttr != null && Boolean.parseBoolean(logMessageIDAttr.getAttributeValue())) {
logMediator.setLogMessageID(true);
}

// Set the log statement category (i.e. INFO, DEBUG, etc..)
OMAttribute category = elem.getAttribute(ATT_CATEGORY);
if (category != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public OMElement serializeSpecificMediator(Mediator m) {
OMElement messageElement = fac.createOMElement("message", synNS);
messageElement.setText(mediator.getMessageTemplate());
log.addChild(messageElement);
log.addAttribute(fac.createOMAttribute("logMessageID", nullNS,
Boolean.toString(mediator.isLogMessageID())));
break;
}
if (StringUtils.isNotBlank(logLevel)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class LogMediator extends AbstractMediator {

private String messageTemplate = "";
private boolean isContentAware = false;
private boolean logMessageID = false;
private final Map<String, SynapseExpression> inlineExpressionCache = new ConcurrentHashMap<>();

/**
Expand Down Expand Up @@ -169,6 +170,18 @@ private String getLogMessage(MessageContext synCtx) {

private String getCustomLogMessage(MessageContext synCtx) {
StringBuffer sb = new StringBuffer();
if (logMessageID) {
if (synCtx.getMessageID() != null) {
sb.append("MessageID: ").append(synCtx.getMessageID());
}
if (getCorrelationId(synCtx) != null) {
sb.append(separator).append("correlation_id: ").append(getCorrelationId(synCtx));
}
// append separator if message id is logged
if (sb.length() > 0) {
sb.append(separator);
}
}
processMessageTemplate(sb, synCtx, messageTemplate);
setCustomProperties(sb, synCtx);
return trimLeadingSeparator(sb);
Expand Down Expand Up @@ -330,6 +343,16 @@ public void setMessageTemplate(String messageTemplate) {
this.messageTemplate = messageTemplate.replace("\\n", "\n").replace("\\t", "\t");
}

public boolean isLogMessageID() {

return logMessageID;
}

public void setLogMessageID(boolean logMessageID) {

this.logMessageID = logMessageID;
}

private String trimLeadingSeparator(StringBuffer sb) {
String retStr = sb.toString();
if (retStr.startsWith(separator)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,18 +258,23 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat
synLog.traceOrDebug("Foreach mediator : Mediating from ContinuationState");
}

boolean result;
boolean result = false;
boolean readyToAggregate = false;
SequenceMediator branchSequence = target.getSequence();
boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled();
// If there are no children and the continuation was triggered from a mediator worker start aggregation
// otherwise mediate through the sub branch sequence
if (!continuationState.hasChild()) {
if (Utils.isContinuationTriggeredFromMediatorWorker(synCtx)) {
synLog.traceOrDebug("Continuation is triggered from a mediator worker");
result = true;
synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, false);
readyToAggregate = true;
} else {
synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the sub branch sequence");
result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1);
if (result) {
readyToAggregate = true;
}
}
} else {
synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the child continuation state");
Expand All @@ -285,10 +290,10 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat
if (continueWithoutAggregation) {
return false;
}
if (result) {
if (readyToAggregate) {
return aggregateMessages(synCtx, synLog);
}
return false;
return result;
}

private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) {
Expand Down Expand Up @@ -424,7 +429,7 @@ private boolean completeAggregate(ForEachAggregate aggregate) {
ContinuationStackManager.updateSeqContinuationState(originalMessageContext, getMediatorPosition());

getLog(originalMessageContext).traceOrDebug("End : Foreach mediator");
boolean result = false;
boolean result;

// Set CONTINUE_STATISTICS_FLOW to avoid mark event collection as finished before the aggregation is completed
originalMessageContext.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true);
Expand All @@ -446,7 +451,7 @@ private boolean completeAggregate(ForEachAggregate aggregate) {
}
} while (result && !originalMessageContext.getContinuationStateStack().isEmpty());
CloseEventCollector.closeEventsAfterScatterGather(originalMessageContext);
return result;
return false;
} else {
handleException(aggregate, "Error retrieving the original message context", null, aggregate.getLastMessage());
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat
synLog.traceOrDebug("Scatter Gather mediator : Mediating from ContinuationState");
}

boolean result;
boolean result = false;
boolean readyToAggregate = false;
int subBranch = ((ReliantContinuationState) continuationState).getSubBranch();

SequenceMediator branchSequence = targets.get(subBranch).getSequence();
Expand All @@ -277,10 +278,14 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat
if (!continuationState.hasChild()) {
if (Utils.isContinuationTriggeredFromMediatorWorker(synCtx)) {
synLog.traceOrDebug("Continuation is triggered from a mediator worker");
result = true;
synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, false);
readyToAggregate = true;
} else {
synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the sub branch sequence");
result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1);
if (result) {
readyToAggregate = true;
}
}
} else {
synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the child continuation state");
Expand All @@ -292,10 +297,10 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat
((Mediator) mediator).reportCloseStatistics(synCtx, null);
}
}
if (result) {
if (readyToAggregate) {
return aggregateMessages(synCtx, synLog);
}
return false;
return result;
}

private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) {
Expand Down Expand Up @@ -480,10 +485,9 @@ private boolean processAggregation(MessageContext messageContext, Aggregate aggr
if (Utils.isTargetBody(resultTarget)) {
// Set content type to the aggregated message
setContentType(messageContext);
} else {
// Update the continuation state to current mediator position as we are using the original message context
ContinuationStackManager.updateSeqContinuationState(messageContext, getMediatorPosition());
}
// Update the continuation state to current mediator position
ContinuationStackManager.updateSeqContinuationState(messageContext, getMediatorPosition());
messageContext.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true);

if (RuntimeStatisticCollector.isStatisticsEnabled()) {
Expand All @@ -492,7 +496,7 @@ private boolean processAggregation(MessageContext messageContext, Aggregate aggr
}

getLog(messageContext).traceOrDebug("End : Scatter Gather mediator");
boolean result = false;
boolean result;
do {
SeqContinuationState seqContinuationState =
(SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(messageContext);
Expand All @@ -507,7 +511,7 @@ private boolean processAggregation(MessageContext messageContext, Aggregate aggr
}
} while (result && !messageContext.getContinuationStateStack().isEmpty());
CloseEventCollector.closeEventsAfterScatterGather(messageContext);
return result;
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testForEachMediatorV2_continueWithoutAggregation() throws Exception
String inputXML = "<foreach collection=\"${payload.array}\" parallel-execution=\"true\" " +
"continue-without-aggregation=\"true\" xmlns=\"http://ws.apache.org/ns/synapse\">" +
"<sequence>" +
"<log>" +
"<log logMessageID=\"false\">" +
"<message>Processing payload ${payload}</message>" +
"</log>" +
"</sequence>" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,18 @@ private String getXmlOfMediatorScenarioOne(String level) {
}

private String getXmlOfLogMediatorWithTemplate() {
return "<log xmlns=\"http://ws.apache.org/ns/synapse\">" +
return "<log xmlns=\"http://ws.apache.org/ns/synapse\" logMessageID=\"false\">" +
"<message>Processing message with ID: 123</message></log>";
}

private String getXmlOfLogMediatorWithTemplateAndProps() {
return "<log xmlns=\"http://ws.apache.org/ns/synapse\">" +
return "<log xmlns=\"http://ws.apache.org/ns/synapse\" logMessageID=\"false\">" +
"<message>Processing message with ID: 123</message>" +
"<property name=\"Text\" value=\"Sending quote request\"/></log>";
}

private String getXmlOfLogMediatorWithEmptyTemplateAndProps() {
return "<log xmlns=\"http://ws.apache.org/ns/synapse\">" +
return "<log xmlns=\"http://ws.apache.org/ns/synapse\" logMessageID=\"false\">" +
"<message></message>" +
"<property name=\"Text\" value=\"Sending quote request\"/></log>";
}
Expand Down
Loading