Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve invoke mediator to support connector response model #2283

Merged
merged 2 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,9 @@ public enum ENDPOINT_TIMEOUT_TYPE { ENDPOINT_TIMEOUT, GLOBAL_TIMEOUT, HTTP_CONNE
public static final String OAUTH_TAG = "oauth";
public static final String SCATTER_MESSAGES = "SCATTER_MESSAGES";
public static final String CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER = "CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER";
public static final String RESPONSE_VARIABLE = "responseVariable";
public static final String OVERWRITE_BODY = "overwriteBody";
public static final String ORIGINAL_PAYLOAD = "ORIGINAL_PAYLOAD";

public static final String DEFAULT_ERROR_TYPE = "ANY";
public static final String ERROR_STATS_REPORTED = "ERROR_STATS_REPORTED";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.synapse.mediators.elementary.EnrichMediator;
import org.apache.synapse.mediators.elementary.Source;
import org.apache.synapse.mediators.elementary.Target;
import org.apache.synapse.util.CallMediatorEnrichUtil;
import org.apache.synapse.util.MediatorEnrichUtil;
import org.jaxen.JaxenException;

import javax.xml.namespace.QName;
Expand Down Expand Up @@ -110,7 +110,7 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
OMElement targetEle = elem.getFirstChildWithName(TARGET_Q);
if (targetEle != null) {
if (sourceEle == null) {
Source source = CallMediatorEnrichUtil.createSourceWithBody();
Source source = MediatorEnrichUtil.createSourceWithBody();
callMediator.setSourceAvailable(true);
callMediator.setSourceForOutboundPayload(source);
}
Expand Down Expand Up @@ -168,7 +168,7 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
private void populateSource(CallMediator callMediator, Source source, OMElement sourceEle) {
OMAttribute typeAttr = sourceEle.getAttribute(ATT_TYPE);
if (typeAttr != null && typeAttr.getAttributeValue() != null) {
source.setSourceType(CallMediatorEnrichUtil.convertTypeToInt(typeAttr.getAttributeValue()));
source.setSourceType(MediatorEnrichUtil.convertTypeToInt(typeAttr.getAttributeValue()));
}

OMAttribute contentTypeAtt = sourceEle.getAttribute(CONTENT_TYPE);
Expand Down Expand Up @@ -220,7 +220,7 @@ private void populateTarget(CallMediator callMediator, Target target, OMElement
OMAttribute typeAttr = sourceEle.getAttribute(ATT_TYPE);
target.setAction("replace");
if (typeAttr != null && typeAttr.getAttributeValue() != null) {
int type = CallMediatorEnrichUtil.convertTypeToInt(typeAttr.getAttributeValue());
int type = MediatorEnrichUtil.convertTypeToInt(typeAttr.getAttributeValue());
if (type >= 0) {
target.setTargetType(type);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
import org.apache.synapse.transport.util.MessageHandlerProvider;
import org.apache.synapse.transport.customlogsetter.CustomLogSetter;
import org.apache.synapse.unittest.UnitTestingExecutor;
import org.apache.synapse.util.CallMediatorEnrichUtil;
import org.apache.synapse.util.MediatorEnrichUtil;
import org.apache.synapse.util.concurrent.InboundThreadPool;
import org.apache.synapse.util.concurrent.SynapseThreadPool;
import org.apache.synapse.util.logging.LoggingUtils;
Expand Down Expand Up @@ -879,26 +879,26 @@ private void callMediatorPostMediate(MessageContext response) {
Target targetForResponsePayload;

if (isTargetAvailable) {
CallMediatorEnrichUtil.buildMessage(response);
MediatorEnrichUtil.buildMessage(response);
}
if (isTargetAvailable && isSourceAvailable) {
sourceForResponsePayload = CallMediatorEnrichUtil.createSourceWithBody();
sourceForOriginalPayload = CallMediatorEnrichUtil.createSourceWithProperty(INTERMEDIATE_ORIGINAL_BODY);
targetForResponsePayload = CallMediatorEnrichUtil.createTargetWithBody();
CallMediatorEnrichUtil
sourceForResponsePayload = MediatorEnrichUtil.createSourceWithBody();
sourceForOriginalPayload = MediatorEnrichUtil.createSourceWithProperty(INTERMEDIATE_ORIGINAL_BODY);
targetForResponsePayload = MediatorEnrichUtil.createTargetWithBody();
MediatorEnrichUtil
.doEnrich(response, sourceForResponsePayload, targetForInboundPayload, sourceMessageType);
CallMediatorEnrichUtil
MediatorEnrichUtil
.doEnrich(response, sourceForOriginalPayload, targetForResponsePayload, originalMessageType);
CallMediatorEnrichUtil.preservetransportHeaders(response, originalTransportHeaders);
MediatorEnrichUtil.preservetransportHeaders(response, originalTransportHeaders);
if (!sourceMessageType.equalsIgnoreCase(originalMessageType)) {
CallMediatorEnrichUtil.setContentType(response, originalMessageType, originalContentType);
MediatorEnrichUtil.setContentType(response, originalMessageType, originalContentType);
if (sourceMessageType.equalsIgnoreCase(JSON_TYPE)) {
JsonUtil.removeJsonStream(((Axis2MessageContext) response).getAxis2MessageContext());
}
}
} else if (isTargetAvailable) {
sourceForResponsePayload = CallMediatorEnrichUtil.createSourceWithBody();
CallMediatorEnrichUtil
sourceForResponsePayload = MediatorEnrichUtil.createSourceWithBody();
MediatorEnrichUtil
.doEnrich(response, sourceForResponsePayload, targetForInboundPayload, sourceMessageType);
}
response.setProperty(IS_SOURCE_AVAILABLE, false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2025, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* you may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.synapse.data.connector;

import java.util.Map;

public interface ConnectorResponse {

void setPayload(Object payload);

Object getPayload();

void setHeaders(Map<String, Object> headers);

Map<String, Object> getHeaders();

void setAttributes(Map<String, Object> attributes);

Map<String, Object> getAttributes();

void addAttribute(String key, Object value);

void removeAttribute(String key);

void addHeader(String key, String value);

void removeHeader(String key);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2025, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* you may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.synapse.data.connector;

import java.util.HashMap;
import java.util.Map;

public class DefaultConnectorResponse implements ConnectorResponse {

private Object payload;
private Map<String, Object> headers = new HashMap<>();
private Map<String, Object> attributes = new HashMap<>();

@Override
public void setPayload(Object payload) {
this.payload = payload;
}

@Override
public Object getPayload() {
return payload;
}

@Override
public void setHeaders(Map<String, Object> headers) {
this.headers = headers;
}

@Override
public Map<String, Object> getHeaders() {
return headers;
}

@Override
public void setAttributes(Map<String, Object> attributes) {
this.attributes = attributes;
}

@Override
public Map<String, Object> getAttributes() {
return attributes;
}

@Override
public void addAttribute(String key, Object value) {
attributes.put(key, value);
}

@Override
public void removeAttribute(String key) {
attributes.remove(key);
}

@Override
public void addHeader(String key, String value) {
headers.put(key, value);
}

@Override
public void removeHeader(String key) {
headers.remove(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.synapse.mediators.elementary.Source;
import org.apache.synapse.mediators.elementary.Target;
import org.apache.synapse.message.senders.blocking.BlockingMsgSender;
import org.apache.synapse.util.CallMediatorEnrichUtil;
import org.apache.synapse.util.MediatorEnrichUtil;
import org.apache.synapse.util.MessageHelper;

import java.io.IOException;
Expand Down Expand Up @@ -158,33 +158,33 @@ public boolean mediate(MessageContext synInCtx) {
sourceMessageType = originalMessageType;
}
if (isSourceAvailable) {
CallMediatorEnrichUtil.buildMessage(synInCtx);
MediatorEnrichUtil.buildMessage(synInCtx);
}
if (isSourceAvailable && isTargetAvailable) {
Source sourceForInboundPayload = CallMediatorEnrichUtil.createSourceWithBody();
Source sourceForInboundPayload = MediatorEnrichUtil.createSourceWithBody();
Target targetForOriginalPayload =
CallMediatorEnrichUtil.createTargetWithProperty(INTERMEDIATE_ORIGINAL_BODY);
Target targetForOutboundPayload = CallMediatorEnrichUtil.createTargetWithBody();
MediatorEnrichUtil.createTargetWithProperty(INTERMEDIATE_ORIGINAL_BODY);
Target targetForOutboundPayload = MediatorEnrichUtil.createTargetWithBody();
sourceForInboundPayload.setClone(true);
CallMediatorEnrichUtil
MediatorEnrichUtil
.doEnrich(synInCtx, sourceForInboundPayload, targetForOriginalPayload, originalMessageType);
if (!(EnrichMediator.BODY == sourceForOutboundPayload.getSourceType() &&
EnrichMediator.BODY == targetForOutboundPayload.getTargetType())) {
CallMediatorEnrichUtil
MediatorEnrichUtil
.doEnrich(synInCtx, sourceForOutboundPayload, targetForOutboundPayload, getSourceMessageType());
}
if (!sourceMessageType.equalsIgnoreCase(originalMessageType)) {
CallMediatorEnrichUtil.setContentType(synInCtx, sourceMessageType, sourceMessageType);
MediatorEnrichUtil.setContentType(synInCtx, sourceMessageType, sourceMessageType);
if (originalMessageType.equalsIgnoreCase(JSON_TYPE)) {
JsonUtil.removeJsonStream(axis2MessageContext);
}
}
} else if (isSourceAvailable) {
Target targetForOutboundPayload = CallMediatorEnrichUtil.createTargetWithBody();
CallMediatorEnrichUtil
Target targetForOutboundPayload = MediatorEnrichUtil.createTargetWithBody();
MediatorEnrichUtil
.doEnrich(synInCtx, sourceForOutboundPayload, targetForOutboundPayload, getSourceMessageType());
if (!sourceMessageType.equalsIgnoreCase(originalMessageType)) {
CallMediatorEnrichUtil.setContentType(synInCtx, sourceMessageType, sourceMessageType);
MediatorEnrichUtil.setContentType(synInCtx, sourceMessageType, sourceMessageType);
if (originalMessageType.equalsIgnoreCase(JSON_TYPE)) {
JsonUtil.removeJsonStream(axis2MessageContext);
}
Expand Down Expand Up @@ -280,27 +280,27 @@ private boolean handleBlockingCall(MessageContext synInCtx, String originalMessa
public void postMediate(MessageContext response, String originalMessageType, String originalContentType,
Map originalTransportHeaders) {
if (isTargetAvailable()) {
CallMediatorEnrichUtil.buildMessage(response);
MediatorEnrichUtil.buildMessage(response);
}
if (isTargetAvailable && isSourceAvailable) {
Source sourceForResponsePayload = CallMediatorEnrichUtil.createSourceWithBody();
Source sourceForResponsePayload = MediatorEnrichUtil.createSourceWithBody();
Source sourceForOriginalPayload =
CallMediatorEnrichUtil.createSourceWithProperty(INTERMEDIATE_ORIGINAL_BODY);
Target targetForResponsePayload = CallMediatorEnrichUtil.createTargetWithBody();
CallMediatorEnrichUtil.doEnrich(response, sourceForResponsePayload, targetForInboundPayload,
MediatorEnrichUtil.createSourceWithProperty(INTERMEDIATE_ORIGINAL_BODY);
Target targetForResponsePayload = MediatorEnrichUtil.createTargetWithBody();
MediatorEnrichUtil.doEnrich(response, sourceForResponsePayload, targetForInboundPayload,
getSourceMessageType());
CallMediatorEnrichUtil.doEnrich(response, sourceForOriginalPayload, targetForResponsePayload,
MediatorEnrichUtil.doEnrich(response, sourceForOriginalPayload, targetForResponsePayload,
originalMessageType);
CallMediatorEnrichUtil.preservetransportHeaders(response, originalTransportHeaders);
MediatorEnrichUtil.preservetransportHeaders(response, originalTransportHeaders);
if (!sourceMessageType.equalsIgnoreCase(originalMessageType)) {
CallMediatorEnrichUtil.setContentType(response, originalMessageType, originalContentType);
MediatorEnrichUtil.setContentType(response, originalMessageType, originalContentType);
if (sourceMessageType.equalsIgnoreCase(JSON_TYPE)) {
JsonUtil.removeJsonStream(((Axis2MessageContext) response).getAxis2MessageContext());
}
}
} else if (isTargetAvailable) {
Source sourceForResponsePayload = CallMediatorEnrichUtil.createSourceWithBody();
CallMediatorEnrichUtil.doEnrich(response, sourceForResponsePayload, targetForInboundPayload,
Source sourceForResponsePayload = MediatorEnrichUtil.createSourceWithBody();
MediatorEnrichUtil.doEnrich(response, sourceForResponsePayload, targetForInboundPayload,
getSourceMessageType());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
import org.apache.synapse.SynapseLog;
import org.apache.synapse.commons.json.Constants;
Expand All @@ -52,7 +51,7 @@
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.mediators.Value;
import org.apache.synapse.mediators.eip.EIPUtils;
import org.apache.synapse.util.CallMediatorEnrichUtil;
import org.apache.synapse.util.MediatorEnrichUtil;
import org.apache.synapse.util.InlineExpressionUtil;
import org.apache.synapse.util.synapse.expression.constants.ExpressionConstants;
import org.apache.synapse.util.xpath.SynapseJsonPath;
Expand Down Expand Up @@ -253,7 +252,7 @@ public void insert(MessageContext synContext,
.getProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS);
JsonObject headers = EIPUtils.convertMapToJsonObj(transportHeaders);
result.put(ExpressionConstants.HEADERS, headers);
result.put(ExpressionConstants.ATTRIBUTES, CallMediatorEnrichUtil.populateTransportAttributes(synContext));
result.put(ExpressionConstants.ATTRIBUTES, MediatorEnrichUtil.populateTransportAttributes(synContext));
synContext.setVariable(key, result);
} else {
synLog.error("Action " + action + " is not supported when enriching variables");
Expand Down Expand Up @@ -508,7 +507,7 @@ public void insertJson(MessageContext synCtx, Object sourceJsonElement, SynapseL
.getProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS);
JsonObject headers = EIPUtils.convertMapToJsonObj(transportHeaders);
result.put(ExpressionConstants.HEADERS, headers);
result.put(ExpressionConstants.ATTRIBUTES, CallMediatorEnrichUtil.populateTransportAttributes(synCtx));
result.put(ExpressionConstants.ATTRIBUTES, MediatorEnrichUtil.populateTransportAttributes(synCtx));
synCtx.setVariable(key, result);
}
break;
Expand Down
Loading
Loading