Skip to content

Commit 8e410ba

Browse files
Add synapse expression support and option to set output to a variable
1 parent b61a0c8 commit 8e410ba

File tree

7 files changed

+307
-79
lines changed

7 files changed

+307
-79
lines changed

modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import org.apache.axiom.om.OMAttribute;
2222
import org.apache.axiom.om.OMElement;
23+
import org.apache.commons.lang3.StringUtils;
2324
import org.apache.synapse.Mediator;
25+
import org.apache.synapse.SynapseException;
2426
import org.apache.synapse.mediators.eip.Target;
2527
import org.apache.synapse.mediators.v2.ScatterGather;
2628
import org.jaxen.JaxenException;
@@ -34,7 +36,7 @@
3436
* different message contexts and aggregate the responses back.
3537
*
3638
* <pre>
37-
* &lt;scatter-gather parallel-execution=(true | false)&gt;
39+
* &lt;scatter-gather parallel-execution=(true | false) result-target=(body | variable) content-type=(JSON | XML)&gt;
3840
* &lt;aggregation value="expression" condition="expression" timeout="long"
3941
* min-messages="expression" max-messages="expression"/&gt;
4042
* &lt;sequence&gt;
@@ -59,6 +61,8 @@ public class ScatterGatherMediatorFactory extends AbstractMediatorFactory {
5961
private static final QName ATT_MAX_MESSAGES = new QName("max-messages");
6062
private static final QName SEQUENCE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "sequence");
6163
private static final QName PARALLEL_EXEC_Q = new QName("parallel-execution");
64+
private static final QName RESULT_TARGET_Q = new QName("result-target");
65+
private static final QName CONTENT_TYPE_Q = new QName("content-type");
6266

6367
private static final SequenceMediatorFactory fac = new SequenceMediatorFactory();
6468

@@ -73,10 +77,36 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
7377
if (parallelExecAttr != null && parallelExecAttr.getAttributeValue().equals("false")) {
7478
asynchronousExe = false;
7579
}
76-
7780
mediator.setParallelExecution(asynchronousExe);
7881

82+
OMAttribute contentTypeAttr = elem.getAttribute(CONTENT_TYPE_Q);
83+
if (contentTypeAttr == null || StringUtils.isBlank(contentTypeAttr.getAttributeValue())) {
84+
String msg = "The 'content-type' attribute is required for the configuration of a Scatter Gather mediator";
85+
throw new SynapseException(msg);
86+
} else {
87+
if ("JSON".equals(contentTypeAttr.getAttributeValue())) {
88+
mediator.setContentType(ScatterGather.JSON_TYPE);
89+
} else if ("XML".equals(contentTypeAttr.getAttributeValue())) {
90+
mediator.setContentType(ScatterGather.XML_TYPE);
91+
} else {
92+
String msg = "The 'content-type' attribute should be either 'JSON' or 'XML'";
93+
throw new SynapseException(msg);
94+
}
95+
}
96+
97+
OMAttribute resultTargetAttr = elem.getAttribute(RESULT_TARGET_Q);
98+
if (resultTargetAttr == null || StringUtils.isBlank(resultTargetAttr.getAttributeValue())) {
99+
String msg = "The 'result-target' attribute is required for the configuration of a Scatter Gather mediator";
100+
throw new SynapseException(msg);
101+
} else {
102+
mediator.setResultTarget(resultTargetAttr.getAttributeValue());
103+
}
104+
79105
Iterator sequenceListElements = elem.getChildrenWithName(SEQUENCE_Q);
106+
if (!sequenceListElements.hasNext()) {
107+
String msg = "A 'sequence' element is required for the configuration of a Scatter Gather mediator";
108+
throw new SynapseException(msg);
109+
}
80110
while (sequenceListElements.hasNext()) {
81111
OMElement sequence = (OMElement) sequenceListElements.next();
82112
if (sequence != null) {
@@ -97,6 +127,9 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
97127
} catch (JaxenException e) {
98128
handleException("Unable to load the aggregating expression", e);
99129
}
130+
} else {
131+
String msg = "The 'value' attribute is required for the configuration of a Scatter Gather mediator";
132+
throw new SynapseException(msg);
100133
}
101134

102135
OMAttribute conditionExpr = aggregateElement.getAttribute(ATT_CONDITION);
@@ -123,6 +156,9 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
123156
if (maxMessages != null) {
124157
mediator.setMaxMessagesToComplete(new ValueFactory().createValue("max-messages", aggregateElement));
125158
}
159+
} else {
160+
String msg = "The 'aggregation' element is required for the configuration of a Scatter Gather mediator";
161+
throw new SynapseException(msg);
126162
}
127163
addAllCommentChildrenToList(elem, mediator.getCommentsList());
128164
return mediator;

modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ public OMElement serializeSpecificMediator(Mediator m) {
4343

4444
scatterGatherElement.addAttribute(fac.createOMAttribute(
4545
"parallel-execution", nullNS, Boolean.toString(scatterGatherMediator.getParallelExecution())));
46+
scatterGatherElement.addAttribute(fac.createOMAttribute(
47+
"result-target", nullNS, scatterGatherMediator.getResultTarget()));
48+
scatterGatherElement.addAttribute(fac.createOMAttribute(
49+
"content-type", nullNS, scatterGatherMediator.getContentType()));
50+
4651
OMElement aggregationElement = fac.createOMElement("aggregation", synNS);
4752

4853
SynapsePathSerializer.serializePath(

modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public void run() {
154154
debugManager.advertiseMediationFlowTerminatePoint(synCtx);
155155
debugManager.releaseMediationFlowLock();
156156
}
157-
if (RuntimeStatisticCollector.isStatisticsEnabled()) {
157+
if (RuntimeStatisticCollector.isStatisticsEnabled() && !isScatterMessage(synCtx)) {
158158
this.statisticsCloseEventListener.invokeCloseEventEntry(synCtx);
159159
}
160160
}

modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -521,16 +521,4 @@ public void setComponentStatisticsId(ArtifactHolder holder) {
521521
StatisticIdentityGenerator.reportingFlowContinuableEndEvent(sequenceId, ComponentType.SEQUENCE, holder);
522522
}
523523
}
524-
525-
/**
526-
* Check whether the message is a scatter message or not
527-
*
528-
* @param synCtx MessageContext
529-
* @return true if the message is a scatter message
530-
*/
531-
private static boolean isScatterMessage(MessageContext synCtx) {
532-
533-
Boolean isSkipContinuationState = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES);
534-
return isSkipContinuationState != null && isSkipContinuationState;
535-
}
536524
}

modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ public static void enrichEnvelope(SOAPEnvelope envelope, SOAPEnvelope enricher,
189189
}
190190
}
191191

192-
private static boolean isBody(OMElement body, OMElement enrichingElement) {
192+
public static boolean isBody(OMElement body, OMElement enrichingElement) {
193193
try {
194194
return (body.getLocalName().equals(enrichingElement.getLocalName()) &&
195195
body.getNamespace().getNamespaceURI().equals(enrichingElement.getNamespace().getNamespaceURI()));
@@ -198,7 +198,7 @@ private static boolean isBody(OMElement body, OMElement enrichingElement) {
198198
}
199199
}
200200

201-
private static boolean checkNotEmpty(List list) {
201+
public static boolean checkNotEmpty(List list) {
202202
return list != null && !list.isEmpty();
203203
}
204204

0 commit comments

Comments
 (0)