Skip to content

Commit

Permalink
Merge pull request wso2#2080 from malakaganga/fix_passthrough_rem_dis…
Browse files Browse the repository at this point in the history
…card_master

Fix Pass-through Threads getting stuck due to request message discard
  • Loading branch information
malakaganga authored May 8, 2023
2 parents d8116c8 + 5cfebd3 commit 0e7d377
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,6 @@ public void run() {
getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_START_TIME, System.currentTimeMillis());
}
try {
// If an error has happened in the request processing, consumes the data in pipe completely and discard it
if (response.isForceShutdownConnectionOnComplete()) {
RelayUtils.discardRequestMessage(requestMessageContext);
}

if (expectEntityBody) {
String cType = response.getHeader(HTTP.CONTENT_TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class SourceContext {

private SourceConfiguration sourceConfiguration;

private boolean isPipeMarkedToBeConsumed = false;

private ProtocolState state = ProtocolState.REQUEST_READY;

private SourceRequest request;
Expand Down Expand Up @@ -70,6 +72,15 @@ public ProtocolState getState() {
return state;
}

public boolean isPipeMarkedToBeConsumed() {
return isPipeMarkedToBeConsumed;
}

public void setPipeMarkedToBeConsumed(boolean isPipeDiscarded) {
this.isPipeMarkedToBeConsumed = isPipeDiscarded;
}


public void setState(ProtocolState state) {
this.state = state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,15 @@ public void inputReady(NHttpServerConnection conn,
ContentDecoder decoder) {
try {
ProtocolState protocolState = SourceContext.getState(conn);

if (protocolState != ProtocolState.REQUEST_HEAD
&& protocolState != ProtocolState.REQUEST_BODY) {
// This logic is added specifically here to avoid a race condition that can occur when
// inputReady is already called prior to suspendInput method is called in TargetHandler.
SourceContext sourceContext = (SourceContext)
conn.getContext().getAttribute(TargetContext.CONNECTION_INFORMATION);
if (sourceContext != null && sourceContext.isPipeMarkedToBeConsumed()) {
return;
}
handleInvalidState(conn, "Request message body data received");
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,32 +400,41 @@ public void responseReceived(NHttpClientConnection conn) {
MessageContext requestMsgContext = TargetContext.get(conn).getRequestMsgCtx();
// State is not REQUEST_DONE. i.e the request is not completely written. But the response is started
// receiving, therefore informing a write error has occurred. So the thread which is
// waiting on writing the request out, will get notified.
// waiting on writing the request out, will get notified. And we will proceed with the response
// regardless of the http status code. But mark target and source connections to be closed.
informWriterError(conn);
StatusLine errorStatus = response.getStatusLine();
/* We might receive a 404 or a similar type, even before we write the request body. */
if (errorStatus != null) {
if (errorStatus.getStatusCode() >= HttpStatus.SC_BAD_REQUEST) {
TargetContext.updateState(conn, ProtocolState.REQUEST_DONE);
conn.resetOutput();
TargetContext.updateState(conn, ProtocolState.REQUEST_DONE);
conn.resetOutput();
if (log.isDebugEnabled()) {
log.debug(conn + ": Received response with status code : " + response.getStatusLine()
.getStatusCode() + " in invalid state : " + connState.name());
}
if (errorStatus.getStatusCode() < HttpStatus.SC_BAD_REQUEST) {
log.warn(conn + ": Received a response but request is not completely written to the backend"
+ "with status code : " + response.getStatusLine()
.getStatusCode() + " in state : " + connState.name());
}
if (requestMsgContext != null) {
NHttpServerConnection sourceConn = (NHttpServerConnection) requestMsgContext.getProperty(
PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION);
if (sourceConn != null) {
//Suspend input to avoid invoking input ready method.
sourceConn.suspendInput();
SourceContext sourceContext = (SourceContext)sourceConn.getContext().getAttribute(TargetContext.CONNECTION_INFORMATION);
if (sourceContext != null) {
sourceContext.setPipeMarkedToBeConsumed(true);
}
SourceContext.updateState(sourceConn, ProtocolState.REQUEST_DONE);
SourceContext.get(sourceConn).setShutDown(true);
}
} else {
if (log.isDebugEnabled()) {
log.debug(conn + ": Received response with status code : " + response.getStatusLine()
.getStatusCode() + " in invalid state : " + connState.name());
log.debug(conn + ": has not started any request");
}
if (requestMsgContext != null) {
NHttpServerConnection sourceConn = (NHttpServerConnection) requestMsgContext.getProperty(
PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION);
if (sourceConn != null) {
SourceContext.updateState(sourceConn, ProtocolState.REQUEST_DONE);
SourceContext.get(sourceConn).setShutDown(true);
}
} else {
if (log.isDebugEnabled()) {
log.debug(conn + ": has not started any request");
}
if (statusCode == HttpStatus.SC_REQUEST_TIMEOUT) {
return; // ignoring the stale connection close
}
if (statusCode == HttpStatus.SC_REQUEST_TIMEOUT) {
return; // ignoring the stale connection close
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ private int getKeepAliveTimeout(String keepAlive) {
}

public boolean isForceShutdownConnectionOnComplete() {

return forceShutdownConnectionOnComplete;
}
}

0 comments on commit 0e7d377

Please sign in to comment.