diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/LogMediatorFactory.java b/modules/core/src/main/java/org/apache/synapse/config/xml/LogMediatorFactory.java index 5f9bf96bcf..28f3efb305 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/LogMediatorFactory.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/LogMediatorFactory.java @@ -58,6 +58,7 @@ public class LogMediatorFactory extends AbstractMediatorFactory { protected static final QName ELEMENT_MESSAGE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "message"); private static final QName ATT_LOG_FULL_PAYLOAD = new QName("logFullPayload"); + private static final QName ATT_LOG_MESSAGE_ID = new QName("logMessageID"); public QName getTagQName() { return LOG_Q; @@ -110,6 +111,11 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) { } } + OMAttribute logMessageIDAttr = elem.getAttribute(ATT_LOG_MESSAGE_ID); + if (logMessageIDAttr != null && Boolean.parseBoolean(logMessageIDAttr.getAttributeValue())) { + logMediator.setLogMessageID(true); + } + // Set the log statement category (i.e. INFO, DEBUG, etc..) OMAttribute category = elem.getAttribute(ATT_CATEGORY); if (category != null) { diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/LogMediatorSerializer.java b/modules/core/src/main/java/org/apache/synapse/config/xml/LogMediatorSerializer.java index aa48ffd686..decdca88dd 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/LogMediatorSerializer.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/LogMediatorSerializer.java @@ -59,6 +59,8 @@ public OMElement serializeSpecificMediator(Mediator m) { OMElement messageElement = fac.createOMElement("message", synNS); messageElement.setText(mediator.getMessageTemplate()); log.addChild(messageElement); + log.addAttribute(fac.createOMAttribute("logMessageID", nullNS, + Boolean.toString(mediator.isLogMessageID()))); break; } if (StringUtils.isNotBlank(logLevel)) { diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/builtin/LogMediator.java b/modules/core/src/main/java/org/apache/synapse/mediators/builtin/LogMediator.java index e8da643676..9566ff4167 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/builtin/LogMediator.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/builtin/LogMediator.java @@ -84,6 +84,7 @@ public class LogMediator extends AbstractMediator { private String messageTemplate = ""; private boolean isContentAware = false; + private boolean logMessageID = false; private final Map inlineExpressionCache = new ConcurrentHashMap<>(); /** @@ -169,6 +170,18 @@ private String getLogMessage(MessageContext synCtx) { private String getCustomLogMessage(MessageContext synCtx) { StringBuffer sb = new StringBuffer(); + if (logMessageID) { + if (synCtx.getMessageID() != null) { + sb.append("MessageID: ").append(synCtx.getMessageID()); + } + if (getCorrelationId(synCtx) != null) { + sb.append(separator).append("correlation_id: ").append(getCorrelationId(synCtx)); + } + // append separator if message id is logged + if (sb.length() > 0) { + sb.append(separator); + } + } processMessageTemplate(sb, synCtx, messageTemplate); setCustomProperties(sb, synCtx); return trimLeadingSeparator(sb); @@ -330,6 +343,16 @@ public void setMessageTemplate(String messageTemplate) { this.messageTemplate = messageTemplate.replace("\\n", "\n").replace("\\t", "\t"); } + public boolean isLogMessageID() { + + return logMessageID; + } + + public void setLogMessageID(boolean logMessageID) { + + this.logMessageID = logMessageID; + } + private String trimLeadingSeparator(StringBuffer sb) { String retStr = sb.toString(); if (retStr.startsWith(separator)) { diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ForEachMediatorV2.java b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ForEachMediatorV2.java index b041156020..0e95604aa8 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ForEachMediatorV2.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ForEachMediatorV2.java @@ -258,7 +258,8 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat synLog.traceOrDebug("Foreach mediator : Mediating from ContinuationState"); } - boolean result; + boolean result = false; + boolean readyToAggregate = false; SequenceMediator branchSequence = target.getSequence(); boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled(); // If there are no children and the continuation was triggered from a mediator worker start aggregation @@ -266,10 +267,14 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat if (!continuationState.hasChild()) { if (Utils.isContinuationTriggeredFromMediatorWorker(synCtx)) { synLog.traceOrDebug("Continuation is triggered from a mediator worker"); - result = true; + synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, false); + readyToAggregate = true; } else { synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the sub branch sequence"); result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1); + if (result) { + readyToAggregate = true; + } } } else { synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the child continuation state"); @@ -285,10 +290,10 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat if (continueWithoutAggregation) { return false; } - if (result) { + if (readyToAggregate) { return aggregateMessages(synCtx, synLog); } - return false; + return result; } private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) { @@ -424,7 +429,7 @@ private boolean completeAggregate(ForEachAggregate aggregate) { ContinuationStackManager.updateSeqContinuationState(originalMessageContext, getMediatorPosition()); getLog(originalMessageContext).traceOrDebug("End : Foreach mediator"); - boolean result = false; + boolean result; // Set CONTINUE_STATISTICS_FLOW to avoid mark event collection as finished before the aggregation is completed originalMessageContext.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true); @@ -446,7 +451,7 @@ private boolean completeAggregate(ForEachAggregate aggregate) { } } while (result && !originalMessageContext.getContinuationStateStack().isEmpty()); CloseEventCollector.closeEventsAfterScatterGather(originalMessageContext); - return result; + return false; } else { handleException(aggregate, "Error retrieving the original message context", null, aggregate.getLastMessage()); return false; diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java index 8cc1ceb3e5..d6f271ae31 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java @@ -267,7 +267,8 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat synLog.traceOrDebug("Scatter Gather mediator : Mediating from ContinuationState"); } - boolean result; + boolean result = false; + boolean readyToAggregate = false; int subBranch = ((ReliantContinuationState) continuationState).getSubBranch(); SequenceMediator branchSequence = targets.get(subBranch).getSequence(); @@ -277,10 +278,14 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat if (!continuationState.hasChild()) { if (Utils.isContinuationTriggeredFromMediatorWorker(synCtx)) { synLog.traceOrDebug("Continuation is triggered from a mediator worker"); - result = true; + synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, false); + readyToAggregate = true; } else { synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the sub branch sequence"); result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1); + if (result) { + readyToAggregate = true; + } } } else { synLog.traceOrDebug("Continuation is triggered from a callback, mediating through the child continuation state"); @@ -292,10 +297,10 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat ((Mediator) mediator).reportCloseStatistics(synCtx, null); } } - if (result) { + if (readyToAggregate) { return aggregateMessages(synCtx, synLog); } - return false; + return result; } private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) { @@ -480,10 +485,9 @@ private boolean processAggregation(MessageContext messageContext, Aggregate aggr if (Utils.isTargetBody(resultTarget)) { // Set content type to the aggregated message setContentType(messageContext); - } else { - // Update the continuation state to current mediator position as we are using the original message context - ContinuationStackManager.updateSeqContinuationState(messageContext, getMediatorPosition()); } + // Update the continuation state to current mediator position + ContinuationStackManager.updateSeqContinuationState(messageContext, getMediatorPosition()); messageContext.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true); if (RuntimeStatisticCollector.isStatisticsEnabled()) { @@ -492,7 +496,7 @@ private boolean processAggregation(MessageContext messageContext, Aggregate aggr } getLog(messageContext).traceOrDebug("End : Scatter Gather mediator"); - boolean result = false; + boolean result; do { SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(messageContext); @@ -507,7 +511,7 @@ private boolean processAggregation(MessageContext messageContext, Aggregate aggr } } while (result && !messageContext.getContinuationStateStack().isEmpty()); CloseEventCollector.closeEventsAfterScatterGather(messageContext); - return result; + return false; } /** diff --git a/modules/core/src/test/java/org/apache/synapse/config/xml/ForEachMediatorSerializationTest.java b/modules/core/src/test/java/org/apache/synapse/config/xml/ForEachMediatorSerializationTest.java index 940255c2db..e61220040b 100644 --- a/modules/core/src/test/java/org/apache/synapse/config/xml/ForEachMediatorSerializationTest.java +++ b/modules/core/src/test/java/org/apache/synapse/config/xml/ForEachMediatorSerializationTest.java @@ -62,7 +62,7 @@ public void testForEachMediatorV2_continueWithoutAggregation() throws Exception String inputXML = "" + "" + - "" + + "" + "Processing payload ${payload}" + "" + "" + diff --git a/modules/core/src/test/java/org/apache/synapse/config/xml/LogMediatorSerializationTest.java b/modules/core/src/test/java/org/apache/synapse/config/xml/LogMediatorSerializationTest.java index 8d9e4ca7bf..59f3e80b7a 100644 --- a/modules/core/src/test/java/org/apache/synapse/config/xml/LogMediatorSerializationTest.java +++ b/modules/core/src/test/java/org/apache/synapse/config/xml/LogMediatorSerializationTest.java @@ -138,18 +138,18 @@ private String getXmlOfMediatorScenarioOne(String level) { } private String getXmlOfLogMediatorWithTemplate() { - return "" + + return "" + "Processing message with ID: 123"; } private String getXmlOfLogMediatorWithTemplateAndProps() { - return "" + + return "" + "Processing message with ID: 123" + ""; } private String getXmlOfLogMediatorWithEmptyTemplateAndProps() { - return "" + + return "" + "" + ""; }