From 707df658998af0802d7614760f8de10437a623af Mon Sep 17 00:00:00 2001 From: Alban Fonrouge Date: Mon, 20 Nov 2017 14:19:46 +0100 Subject: [PATCH 1/6] Properly ignore duplicate bridge configuration across HA cluster during startup. --- .../bridge/admin/BridgeServiceManagerImpl.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/mq/main/bridge/bridge-admin/src/main/java/com/sun/messaging/bridge/admin/BridgeServiceManagerImpl.java b/mq/main/bridge/bridge-admin/src/main/java/com/sun/messaging/bridge/admin/BridgeServiceManagerImpl.java index 212f40489..7f6b73056 100644 --- a/mq/main/bridge/bridge-admin/src/main/java/com/sun/messaging/bridge/admin/BridgeServiceManagerImpl.java +++ b/mq/main/bridge/bridge-admin/src/main/java/com/sun/messaging/bridge/admin/BridgeServiceManagerImpl.java @@ -213,9 +213,14 @@ public synchronized void init(BridgeBaseContext ctx) throws Exception { } try { store.addJMSBridge(name, true, null); - } catch (DupKeyException e) { - _bc.logInfo(_bmr.getKString(_bmr.I_JMSBRIDGE_NOT_OWNER, name), null); - itr.remove(); + } catch (Exception e) { + // DupKeyException gets wrapped into a BrokerException + if (e instanceof DupKeyException || e.getCause() != null && e.getCause() instanceof DupKeyException) { + _bc.logInfo(_bmr.getKString(_bmr.I_JMSBRIDGE_NOT_OWNER, name), null); + itr.remove(); + } else { + throw e; + } } } jmsbridges = store.getJMSBridges(null); From 687190842adf6a2bea0b184c17ae775d10999ba7 Mon Sep 17 00:00:00 2001 From: Alban Fonrouge Date: Fri, 28 Sep 2018 23:58:09 +0200 Subject: [PATCH 2/6] Simulate error replies to clients: * takeover in progress * don't send replies --- .gitignore | 4 ++++ Jenkinsfile | 23 ++++++++++++++++++ .../common/handlers/HelloHandler.java | 19 ++++++++------- .../service/imq/IMQDualThreadConnection.java | 24 +++++++++++-------- 4 files changed, 51 insertions(+), 19 deletions(-) create mode 100644 Jenkinsfile diff --git a/.gitignore b/.gitignore index 59dd9685c..c4027cd41 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,7 @@ **/target/** # Ignore examples .class **/src/share/java/examples/**/*.class +**/*.iml +.idea +binary +dist diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 000000000..397b77c7c --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,23 @@ +node('java') { + + stage('Provision tools') { + tool 'Maven 3.5.x' + tool 'JDK 8' + tool 'Ant-1.8.2 autoinstall' + } + + stage('Checkout') { + checkout scm + } + + withAnt(installation: 'Ant-1.8.2 autoinstall', jdk: 'JDK 8') { + withMaven(jdk: 'JDK 8', maven: 'Maven 3.5.x', mavenSettingsConfig: 'd36053eb-3f69-482a-8dcd-d7ea248c53ba') { + + stage('Build') { + sh "mvn -V -U -e clean package" + archive 'mq/dist/bundles/mq_5_1_1.zip' + } + + } + } +} diff --git a/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/common/handlers/HelloHandler.java b/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/common/handlers/HelloHandler.java index f99f857ed..26acb0295 100644 --- a/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/common/handlers/HelloHandler.java +++ b/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/common/handlers/HelloHandler.java @@ -40,7 +40,7 @@ /* * @(#)HelloHandler.java 1.71 06/28/07 - */ + */ package com.sun.messaging.jmq.jmsserver.common.handlers; @@ -73,7 +73,7 @@ import com.sun.messaging.jmq.util.UID; import com.sun.messaging.jmq.util.GoodbyeReason; - +import static com.sun.messaging.jmq.jmsserver.comm.CommGlobals.IMQ; /** @@ -84,7 +84,7 @@ * (e.g. tcp does not need the HELLO message to set up a connection, since * each socket corresponds to a new connection) */ -public class HelloHandler extends PacketHandler +public class HelloHandler extends PacketHandler { private ConnectionManager connectionList; @@ -118,7 +118,7 @@ public class HelloHandler extends PacketHandler public static void DUMP(String title) { Globals.getLogger().log(Logger.DEBUG,title); Globals.getLogger().log(Logger.DEBUG,"------------------------"); - Globals.getLogger().log(Logger.DEBUG,"Number of connections is " + + Globals.getLogger().log(Logger.DEBUG,"Number of connections is " + Globals.getConnectionManager().getNumConnections(null)); List l = Globals.getConnectionManager().getConnectionList(null); for (int i=0; i < l.size(); i ++ ) { @@ -139,9 +139,9 @@ public HelloHandler(ConnectionManager list) /** * Method to handle HELLO messages */ - public boolean handle(IMQConnection con, Packet msg) - throws BrokerException - { + public boolean handle(IMQConnection con, Packet msg) + throws BrokerException + { if (DEBUG) { logger.log(Logger.DEBUGHIGH, "HelloHandler: handle() [ Received Hello Message]"); @@ -407,8 +407,9 @@ public boolean handle(IMQConnection con, Packet msg) } } - HAMonitorService hamonitor = Globals.getHAMonitorService(); - if (hamonitor != null && hamonitor.inTakeover()) { + HAMonitorService hamonitor = Globals.getHAMonitorService(); + if (hamonitor != null && hamonitor.inTakeover() + || Globals.getConfig().getBooleanProperty(IMQ + ".simulate.takeover") ) { if (((IMQService)con.getService()).getServiceType() != ServiceType.ADMIN) { status = Status.TIMEOUT; if (oldCID != null) { diff --git a/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/service/imq/IMQDualThreadConnection.java b/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/service/imq/IMQDualThreadConnection.java index 8f3587786..8b3585e00 100644 --- a/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/service/imq/IMQDualThreadConnection.java +++ b/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/service/imq/IMQDualThreadConnection.java @@ -44,19 +44,10 @@ package com.sun.messaging.jmq.jmsserver.service.imq; -import java.io.IOException; -import java.security.Principal; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Queue; - -import javax.jms.JMSException; - import com.sun.messaging.jmq.io.Packet; +import com.sun.messaging.jmq.io.PacketDispatcher; import com.sun.messaging.jmq.io.PacketType; import com.sun.messaging.jmq.io.ReadWritePacket; -import com.sun.messaging.jmq.io.PacketDispatcher; import com.sun.messaging.jmq.jmsserver.Globals; import com.sun.messaging.jmq.jmsserver.core.Session; import com.sun.messaging.jmq.jmsserver.data.PacketRouter; @@ -71,6 +62,16 @@ import com.sun.messaging.jmq.util.lists.Reason; import com.sun.messaging.jmq.util.log.Logger; +import javax.jms.JMSException; +import java.io.IOException; +import java.security.Principal; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Queue; + +import static com.sun.messaging.jmq.jmsserver.comm.CommGlobals.IMQ; + public class IMQDualThreadConnection extends IMQBasicConnection implements DirectBrokerConnection { class DummyQueue implements HandOffQueue @@ -288,6 +289,9 @@ public boolean isDirectBuffers() { public void sendControlMessage(Packet p) { + if (Globals.getConfig().getBooleanProperty(IMQ + ".simulate.noreply")) { + return; + } if (p.getPacketType() > PacketType.MESSAGE) { p.setIP(ipAddress); p.setPort(0); From 2063afb45b02ce017ab3c89068c253cf8cfb6ef8 Mon Sep 17 00:00:00 2001 From: Alban Fonrouge Date: Sat, 29 Sep 2018 00:03:13 +0200 Subject: [PATCH 3/6] Simulate error replies to clients: * takeover in progress * don't send replies --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 397b77c7c..6c87d04dc 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -14,7 +14,7 @@ node('java') { withMaven(jdk: 'JDK 8', maven: 'Maven 3.5.x', mavenSettingsConfig: 'd36053eb-3f69-482a-8dcd-d7ea248c53ba') { stage('Build') { - sh "mvn -V -U -e clean package" + sh "cd mq; mvn -V -U -e clean package" archive 'mq/dist/bundles/mq_5_1_1.zip' } From 097213e8277cbae7870b5cee5a651a02271a954b Mon Sep 17 00:00:00 2001 From: Alban Fonrouge Date: Sat, 29 Sep 2018 00:48:49 +0200 Subject: [PATCH 4/6] Simulate error replies to clients: * takeover in progress * don't send replies --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 6c87d04dc..ac1fe8c77 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -15,7 +15,7 @@ node('java') { stage('Build') { sh "cd mq; mvn -V -U -e clean package" - archive 'mq/dist/bundles/mq_5_1_1.zip' + archive 'mq/dist/bundles/mq5_1_1.zip' } } From 5cd9bc2d1835b4e8a13e88ba0f28b3a065949cf7 Mon Sep 17 00:00:00 2001 From: Alban Fonrouge Date: Sat, 29 Sep 2018 00:49:15 +0200 Subject: [PATCH 5/6] Simulate error replies to clients: * takeover in progress * don't send replies --- Jenkinsfile | 1 + 1 file changed, 1 insertion(+) diff --git a/Jenkinsfile b/Jenkinsfile index ac1fe8c77..8fd5577a4 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -16,6 +16,7 @@ node('java') { stage('Build') { sh "cd mq; mvn -V -U -e clean package" archive 'mq/dist/bundles/mq5_1_1.zip' + archive 'mq/dist/bundles/mq5_1_1.tar' } } From e5211a50fd06494b5d330fe9e67b39d91f5770a3 Mon Sep 17 00:00:00 2001 From: Alban Fonrouge Date: Sat, 29 Sep 2018 21:09:51 +0200 Subject: [PATCH 6/6] Simulate error replies to clients: * takeover in progress * don't send replies --- .../common/handlers/HelloHandler.java | 2 +- .../service/imq/IMQDualThreadConnection.java | 2 +- .../service/imq/IMQIPConnection.java | 387 +++++++++--------- 3 files changed, 203 insertions(+), 188 deletions(-) diff --git a/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/common/handlers/HelloHandler.java b/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/common/handlers/HelloHandler.java index 26acb0295..ec5461cbe 100644 --- a/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/common/handlers/HelloHandler.java +++ b/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/common/handlers/HelloHandler.java @@ -409,7 +409,7 @@ public boolean handle(IMQConnection con, Packet msg) HAMonitorService hamonitor = Globals.getHAMonitorService(); if (hamonitor != null && hamonitor.inTakeover() - || Globals.getConfig().getBooleanProperty(IMQ + ".simulate.takeover") ) { + || !con.isAdminConnection() && Globals.getConfig().getBooleanProperty(IMQ + ".simulate.takeover") ) { if (((IMQService)con.getService()).getServiceType() != ServiceType.ADMIN) { status = Status.TIMEOUT; if (oldCID != null) { diff --git a/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/service/imq/IMQDualThreadConnection.java b/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/service/imq/IMQDualThreadConnection.java index 8b3585e00..b751bd1f4 100644 --- a/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/service/imq/IMQDualThreadConnection.java +++ b/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/service/imq/IMQDualThreadConnection.java @@ -289,7 +289,7 @@ public boolean isDirectBuffers() { public void sendControlMessage(Packet p) { - if (Globals.getConfig().getBooleanProperty(IMQ + ".simulate.noreply")) { + if (!isAdminConnection() && Globals.getConfig().getBooleanProperty(IMQ + ".simulate.noreply")) { return; } if (p.getPacketType() > PacketType.MESSAGE) { diff --git a/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/service/imq/IMQIPConnection.java b/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/service/imq/IMQIPConnection.java index cded9067b..22a47d75a 100644 --- a/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/service/imq/IMQIPConnection.java +++ b/mq/main/mq-broker/broker-core/src/main/java/com/sun/messaging/jmq/jmsserver/service/imq/IMQIPConnection.java @@ -40,47 +40,59 @@ /* * @(#)IMQIPConnection.java 1.10 11/06/07 - */ + */ package com.sun.messaging.jmq.jmsserver.service.imq; -import java.net.*; -import java.util.*; -import java.nio.channels.*; -import java.nio.channels.spi.*; -import java.io.*; - -import com.sun.messaging.jmq.jmsserver.service.*; +import com.sun.messaging.jmq.io.BigPacketException; +import com.sun.messaging.jmq.io.Packet; +import com.sun.messaging.jmq.io.PacketType; +import com.sun.messaging.jmq.io.Status; import com.sun.messaging.jmq.jmsserver.Globals; -import com.sun.messaging.jmq.util.log.Logger; -import com.sun.messaging.jmq.jmsserver.resources.BrokerResources; - -import java.security.Principal; import com.sun.messaging.jmq.jmsserver.auth.AccessController; - -import com.sun.messaging.jmq.io.*; -import com.sun.messaging.jmq.jmsserver.service.MetricManager; - -import com.sun.messaging.jmq.util.admin.ConnectionInfo; - -import com.sun.messaging.jmq.util.timer.MQTimer; - -import com.sun.messaging.jmq.util.GoodbyeReason; - import com.sun.messaging.jmq.jmsserver.core.PacketReference; import com.sun.messaging.jmq.jmsserver.core.Session; -import com.sun.messaging.jmq.jmsserver.memory.*; import com.sun.messaging.jmq.jmsserver.data.PacketRouter; -import com.sun.messaging.jmq.util.net.IPAddress; +import com.sun.messaging.jmq.jmsserver.memory.MemoryCallback; +import com.sun.messaging.jmq.jmsserver.memory.MemoryGlobals; +import com.sun.messaging.jmq.jmsserver.net.ProtocolStreams; +import com.sun.messaging.jmq.jmsserver.resources.BrokerResources; +import com.sun.messaging.jmq.jmsserver.service.Connection; +import com.sun.messaging.jmq.jmsserver.service.ConnectionManager; +import com.sun.messaging.jmq.jmsserver.service.MetricManager; +import com.sun.messaging.jmq.jmsserver.service.Service; import com.sun.messaging.jmq.jmsserver.util.BrokerException; import com.sun.messaging.jmq.jmsserver.util.BrokerShutdownRuntimeException; -import com.sun.messaging.jmq.jmsserver.net.*; -import com.sun.messaging.jmq.jmsserver.service.ConnectionManager; -import com.sun.messaging.jmq.util.lists.*; +import com.sun.messaging.jmq.util.GoodbyeReason; +import com.sun.messaging.jmq.util.admin.ConnectionInfo; +import com.sun.messaging.jmq.util.lists.EventType; +import com.sun.messaging.jmq.util.lists.NFLPriorityFifoSet; +import com.sun.messaging.jmq.util.lists.Reason; +import com.sun.messaging.jmq.util.log.Logger; +import com.sun.messaging.jmq.util.net.IPAddress; +import com.sun.messaging.jmq.util.timer.MQTimer; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.StreamCorruptedException; +import java.net.InetAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.channels.spi.AbstractSelectableChannel; +import java.security.Principal; +import java.util.Collections; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Set; +import java.util.TimerTask; +import java.util.Vector; + +import static com.sun.messaging.jmq.jmsserver.comm.CommGlobals.IMQ; -public class IMQIPConnection extends IMQBasicConnection +public class IMQIPConnection extends IMQBasicConnection implements Operation, MemoryCallback { @@ -89,16 +101,16 @@ public class IMQIPConnection extends IMQBasicConnection /** * a value of -1 or 0 turns off feature */ - public static final int closeInterval = Globals.getConfig().getIntProperty(Globals.IMQ + + public static final int closeInterval = Globals.getConfig().getIntProperty(Globals.IMQ + ".ping.close.interval", 5); - public static final boolean enablePingReply = Globals.getConfig().getBooleanProperty(Globals.IMQ + + public static final boolean enablePingReply = Globals.getConfig().getBooleanProperty(Globals.IMQ + ".ping.reply.enable", true); protected int ctrlPktsToConsumer = 0; - boolean STREAMS = true; - boolean BLOCKING = false; + boolean STREAMS = true; + boolean BLOCKING = false; protected Set tmpDestinations = Collections.synchronizedSet(new HashSet()); @@ -112,7 +124,7 @@ public class IMQIPConnection extends IMQBasicConnection // XXX-CODE TO OVERRIDE BEHAVIOR OF PACKETS - // to override the type of packet ... + // to override the type of packet ... // jmq.packet.[ctrl|read|fill].override = [direct/heap/unset] // direct -> always use direct packets // heap -> always use heap packets @@ -145,7 +157,7 @@ public class IMQIPConnection extends IMQBasicConnection "DEBUG: Overriding ctrl message " + " packet behavior to HEAP BUFFERS"); } else { - Globals.getLogger().log(Logger.ERROR, + Globals.getLogger().log(Logger.ERROR, "DEBUG: Can not determine behavior from " +" imq.packet.ctrl.override = "+ctrlover +" not one of the valid setting [heap,direct]"); @@ -168,7 +180,7 @@ public class IMQIPConnection extends IMQBasicConnection "DEBUG: Overriding read packet " + " behavior to HEAP BUFFERS"); } else { - Globals.getLogger().log(Logger.ERROR, + Globals.getLogger().log(Logger.ERROR, "DEBUG: Can not determine behavior from " + " imq.packet.read.override = "+readover +" not one of the valid setting [heap,direct]"); @@ -192,7 +204,7 @@ public class IMQIPConnection extends IMQBasicConnection "DEBUG: Overriding fill packet " + " behavior to HEAP BUFFERS"); } else { - Globals.getLogger().log(Logger.ERROR, + Globals.getLogger().log(Logger.ERROR, "DEBUG: Can not determine " + " behavior from jmq.packet.fill.override = " +fillover @@ -229,7 +241,7 @@ public boolean cancel() { public void run() { con.checkConnection(state); } - } + } private StateWatcher stateWatcher = null; private long interval = DEFAULT_INTERVAL; @@ -258,8 +270,8 @@ public void run() { */ - public IMQIPConnection(Service svc, ProtocolStreams ps, - PacketRouter router) + public IMQIPConnection(Service svc, ProtocolStreams ps, + PacketRouter router) throws IOException, BrokerException { super(svc, router); @@ -289,7 +301,7 @@ public IMQIPConnection(Service svc, ProtocolStreams ps, ctrlEL = this.control.addEventListener(this,EventType.EMPTY, null); setConnectionState(Connection.STATE_CONNECTED); - waitingWritePkt = new Packet(OVERRIDE_FILL_PACKET + waitingWritePkt = new Packet(OVERRIDE_FILL_PACKET ? O_FILL_USE_DIRECT : !STREAMS); if (!isAdminConnection() && Globals.getMemManager() != null) @@ -324,12 +336,12 @@ public int getLocalPort() { return ps.getLocalPort(); } - /** + /** * The debug state of this object */ public synchronized Hashtable getDebugState() { Hashtable ht = super.getDebugState(); - ht.put("pkts[TOTAL](in,out) ", "("+msgsIn + "," + (ctrlPktsToConsumer + + ht.put("pkts[TOTAL](in,out) ", "("+msgsIn + "," + (ctrlPktsToConsumer + msgsToConsumer) + ")"); for (int i=0; i < pktsIn.length; i ++) { if (pktsIn[i] == 0 && pktsOut[i] == 0) @@ -366,7 +378,7 @@ public Vector getDebugMessages(boolean full) { } } return ht; - + } public ConnectionInfo getConnectionInfo() { @@ -375,7 +387,7 @@ public ConnectionInfo getConnectionInfo() { return coninfo; } - + public boolean useDirectBuffers() { return (OVERRIDE_CTRL_PACKET ? O_CTRL_USE_DIRECT : !STREAMS); // return !STREAMS; @@ -388,7 +400,7 @@ public boolean useDirectBuffers() { NotificationInfo ninfo = null; public synchronized AbstractSelectableChannel getChannel() { - if (ps == null) + if (ps == null) return null; return ps.getChannel(); } @@ -419,9 +431,9 @@ public boolean waitUntilDestroyed(long time) { * (if any) */ public synchronized void notifyRelease( - OperationRunnable runner, int events) + OperationRunnable runner, int events) { - /* + /* * the thread is giving us UP ... */ int release_events = 0; @@ -443,18 +455,18 @@ public synchronized void notifyRelease( public synchronized void waitForRelease(long timeout) { long waitt = 5000; - while (read_assigned != null || write_assigned != null) { + while (read_assigned != null || write_assigned != null) { if (timeout <= 0) { - Globals.getLogger().log(Logger.WARNING, + Globals.getLogger().log(Logger.WARNING, "Timeout in waiting for runnable threads release in "+this); return; } Globals.getLogger().log(Logger.INFO, "Waiting for runnable threads release in "+this); if (timeout < waitt) waitt = timeout; try { - wait(waitt); + wait(waitt); } catch (InterruptedException e) { - Globals.getLogger().log(Logger.WARNING, + Globals.getLogger().log(Logger.WARNING, "Interrupted in waiting for runnable threads release in "+this); return; } @@ -476,8 +488,8 @@ public synchronized OperationRunnable getWriteRunnable() { } public synchronized void threadAssigned( - OperationRunnable runner, int events) - throws IllegalAccessException + OperationRunnable runner, int events) + throws IllegalAccessException { int release_events = 0; if ((events & SelectionKey.OP_WRITE) != 0) { @@ -494,7 +506,7 @@ public synchronized void threadAssigned( } read_assigned = runner; } - + if (ninfo != null) { if (release_events != 0) ninfo.released(this, release_events); @@ -524,7 +536,7 @@ private String getKeyString(int events) { */ - public boolean process(int events, boolean wait) + public boolean process(int events, boolean wait) throws IOException { boolean didSomething = false; @@ -539,14 +551,14 @@ public boolean process(int events, boolean wait) processedLastIteration = false; if ((events & SelectionKey.OP_WRITE) != 0) { while (true) { - - if (writeData(wait) != - Operation.PROCESS_PACKETS_REMAINING) + + if (writeData(wait) != + Operation.PROCESS_PACKETS_REMAINING) { // wasnt able to write anymore // break out of the loop break; - } + } processedLastIteration = true; writecount++; } @@ -589,11 +601,11 @@ public String getRemoteConnectionString() { userString = principal.getName(); userset = true; } - } catch (BrokerException e) { + } catch (BrokerException e) { if (IMQBasicConnection.DEBUG) logger.log(Logger.DEBUG,"Exception getting authentication name " + conId, e ); - + } } @@ -625,15 +637,15 @@ protected String localServiceString() { // ------------------------------------------------------------------------- public synchronized void closeConnection( - boolean force, int reason, String reasonStr) - { + boolean force, int reason, String reasonStr) + { if (state >= Connection.STATE_CLOSED) { logger.log(logger.DEBUG,"Requested close of already closed connection:" + this); return; } - + stopConnection(); if (Globals.getMemManager() != null) Globals.getMemManager().removeMemoryCallback(this); @@ -641,11 +653,11 @@ public synchronized void closeConnection( sayGoodbye(false, reason, reasonStr); flushControl(1000); } - + // CR 6798464: Don't mark connection as closed until we've flushed the queue and sent the GOODBYE state = Connection.STATE_CLOSED; notifyConnectionClosed(); - + // clean up everything this.control.removeEventListener(ctrlEL); cleanup(reason == GoodbyeReason.SHUTDOWN_BKR); @@ -690,7 +702,7 @@ protected void cleanupControlPackets(boolean shutdown) { * destroy the connection to the client * clearing out messages, etc */ - public void destroyConnection(boolean force, int reason, String reasonstr) { + public void destroyConnection(boolean force, int reason, String reasonstr) { int oldstate = 0; boolean destroyOK = false; try { @@ -699,16 +711,16 @@ public void destroyConnection(boolean force, int reason, String reasonstr) { oldstate = state; if (state >= Connection.STATE_DESTROYING) return; - + if (state < Connection.STATE_CLOSED) { closeConnection(force, reason, reasonstr); } - + setConnectionState(Connection.STATE_DESTROYING); } Globals.getConnectionManager().removeConnection(getConnectionUID(), force, reason, reasonstr); - + if (accessController.isAuthenticated()) { accessController.logout(); } @@ -747,8 +759,8 @@ public void destroyConnection(boolean force, int reason, String reasonstr) { destroyOK = true; wakeup(); } finally { - if (!destroyOK && reason != GoodbyeReason.SHUTDOWN_BKR - && (Globals.getMemManager() == null + if (!destroyOK && reason != GoodbyeReason.SHUTDOWN_BKR + && (Globals.getMemManager() == null || Globals.getMemManager().getCurrentLevel() > 0)) { state = oldstate; @@ -756,8 +768,8 @@ public void destroyConnection(boolean force, int reason, String reasonstr) { destroyRecurse ++; destroyConnection(force, reason, reasonstr); } - } - + } + // free the lock Globals.getClusterBroadcast().connectionClosed( getConnectionUID(), isAdminConnection()); @@ -765,10 +777,10 @@ public void destroyConnection(boolean force, int reason, String reasonstr) { } /** - * sets the connection state + * sets the connection state * @return false if connection being destroyed */ - public boolean setConnectionState(int state) { + public boolean setConnectionState(int state) { synchronized (timerLock) { this.state = state; if (this.state >= Connection.STATE_CLOSED) { @@ -796,7 +808,7 @@ public boolean setConnectionState(int state) { logger.log(Logger.DEBUG,"InternalError: timer canceled ", ex); } - } else if (state == Connection.STATE_INITIALIZED + } else if (state == Connection.STATE_INITIALIZED || state == Connection.STATE_AUTH_REQUESTED || state == Connection.STATE_AUTH_RESPONSED) { if (stateWatcher != null) { @@ -825,8 +837,8 @@ public boolean setConnectionState(int state) { } catch (IllegalStateException ex) { logger.log(Logger.DEBUG,"InternalError: timer canceled ", ex); } - } else if (state >= Connection.STATE_AUTHENTICATED - || state == Connection.STATE_UNAVAILABLE) + } else if (state >= Connection.STATE_AUTHENTICATED + || state == Connection.STATE_UNAVAILABLE) { if (stateWatcher != null) { try { @@ -883,11 +895,14 @@ public void logConnectionInfo(boolean closing, String reason) { * @param msg message to send back to the client */ public void sendControlMessage(Packet msg) { + if (!isAdminConnection() && Globals.getConfig().getBooleanProperty(IMQ + ".simulate.noreply")) { + return; + } if (!isValid() && msg.getPacketType() != PacketType.GOODBYE ) { logger.log(Logger.INFO,"Internal Warning: message " + msg + "queued on destroyed connection " + this); } - + control.add(msg); synchronized (control) { hasCtrl = !control.isEmpty(); @@ -897,8 +912,8 @@ public void sendControlMessage(Packet msg) { protected void sendControlMessage(Packet msg, boolean priority) { if (IMQBasicConnection.DEBUG) { - logger.log(Logger.DEBUGHIGH, - "IMQIPConnection[ {0} ] queueing Admin packet {1}", + logger.log(Logger.DEBUGHIGH, + "IMQIPConnection[ {0} ] queueing Admin packet {1}", this.toString(), msg.toString()); } if (!isValid()) { // we are being destroyed @@ -912,7 +927,7 @@ protected void sendControlMessage(Packet msg, boolean priority) } - /** + /** * Flush all control messages on this connection to * the client. * @param timeout the lenght of time to try and flush the @@ -925,7 +940,7 @@ public void flushControl(long timeout) { return; } - synchronized (flushCtrlLock) { + synchronized (flushCtrlLock) { if (IMQBasicConnection.DEBUG) { logger.log(Logger.DEBUG, "Flushing Control Messages with timeout of " + timeout); @@ -954,7 +969,7 @@ public void flushControl(long timeout) { } catch (InterruptedException ex) { // no reason to do anything w/ it } - if (flushCtrl && timeout > 0 && + if (flushCtrl && timeout > 0 && System.currentTimeMillis() >= time+timeout) break; } @@ -962,7 +977,7 @@ public void flushControl(long timeout) { if (IMQBasicConnection.DEBUG) { if (flush) { logger.log(Logger.DEBUG, - "Control Flush did not complete in timeout of " + "Control Flush did not complete in timeout of " + timeout); } else { logger.log(Logger.DEBUG, @@ -999,7 +1014,7 @@ protected void localFlush() { } - /** + /** * Flush all control and JMS messages on this connection to * the client. * @param timeout the lenght of time to try and flush the @@ -1010,14 +1025,14 @@ public void flush(long timeout) { localFlush(); return; } - if ( !inCtrlWrite && control.isEmpty() + if ( !inCtrlWrite && control.isEmpty() && !inJMSWrite && hasBusySessions() - && !flushCritical && !lockCritical) + && !flushCritical && !lockCritical) { // nothing to do return; } - synchronized (flushLock) { + synchronized (flushLock) { if (IMQBasicConnection.DEBUG) { logger.log(Logger.DEBUG, "Flushing Messages with timeout of " + timeout); @@ -1028,7 +1043,7 @@ public void flush(long timeout) { // window we should still be woken up w/ the ctrl // notify -> since that happens AFTER a message is // removed from the list - if ( !inCtrlWrite && control.isEmpty() + if ( !inCtrlWrite && control.isEmpty() && !inJMSWrite && hasBusySessions() && !flushCritical && !lockCritical) { @@ -1047,7 +1062,7 @@ public void flush(long timeout) { } catch (InterruptedException ex) { // valid, no reason to check } - if (flush && timeout > 0 && + if (flush && timeout > 0 && System.currentTimeMillis() >= time+timeout) break; } @@ -1088,18 +1103,18 @@ void checkConnection(int state) { String[] args = {toString(), getConnectionStateString(this.state), getConnectionStateString(state), String.valueOf(interval)}; - synchronized (this) { - if (this.state >= state) - { + synchronized (this) { + if (this.state >= state) + { return; } - if (this.state >= Connection.STATE_CLOSED - || this.state == Connection.STATE_UNAVAILABLE) + if (this.state >= Connection.STATE_CLOSED + || this.state == Connection.STATE_UNAVAILABLE) { return; } - logger.log(Logger.WARNING, + logger.log(Logger.WARNING, Globals.getBrokerResources().getKString( BrokerResources.W_CONNECTION_TIMEOUT, args)); } @@ -1112,7 +1127,7 @@ void checkConnection(int state) { // dont bother being nice destroyConnection(false, GoodbyeReason.CON_FATAL_ERROR, Globals.getBrokerResources().getKString( - BrokerResources.W_CONNECTION_TIMEOUT, args)); + BrokerResources.W_CONNECTION_TIMEOUT, args)); } @@ -1138,11 +1153,11 @@ void checkConnection(int state) { // new method to handle how we get the packet // this is overridden in Embedded more to get it from a queue - protected boolean readInPacket(Packet p) + protected boolean readInPacket(Packet p) throws IllegalArgumentException, StreamCorruptedException, BigPacketException, IOException { boolean OK = true; - + if (STREAMS) { assert is!= null; readpkt.readPacket(is); @@ -1170,33 +1185,33 @@ public int readData() if (IMQBasicConnection.DEBUG || DUMP_PACKET || IN_DUMP_PACKET) { logger.log(Logger.DEBUG, - "Reading from " + getClass() + "{0} ", - this.toString() + "Reading from " + getClass() + "{0} ", + this.toString() + Thread.currentThread()); } - + if (!isValid()) { if (IMQBasicConnection.DEBUG) { logger.log(Logger.DEBUG, - "Invalid Connection {0} ", - this.toString() + + "Invalid Connection {0} ", + this.toString() + Thread.currentThread()); } throw new IOException( "Connection has been closed " + this); } - - + + try { if (readpkt == null) { // heck its a new packet - readpkt = new Packet(OVERRIDE_READ_PACKET - ? O_READ_USE_DIRECT + readpkt = new Packet(OVERRIDE_READ_PACKET + ? O_READ_USE_DIRECT : !STREAMS); readpkt.generateSequenceNumber(false); readpkt.generateTimestamp(false); if (IMQBasicConnection.DEBUG) { - logger.log(Logger.DEBUG, - "IMQIPConnection {0} getting a new read packet {1} ", + logger.log(Logger.DEBUG, + "IMQIPConnection {0} getting a new read packet {1} ", this.toString(), readpkt.toString()); } } @@ -1214,12 +1229,12 @@ public int readData() if (isValid()) { try { boolean OK = true; - + OK = readInPacket(readpkt); msgsIn ++; if (readpkt.getPacketType() < PacketType.LAST) pktsIn[readpkt.getPacketType()] ++; - + if (!OK) { // we didnt finish reading return Operation.PROCESS_WRITE_INCOMPLETE; @@ -1233,18 +1248,18 @@ public int readData() // not just packet version if (packetVersion < CURVERSION) - convertPkt = new ConvertPacket(this, + convertPkt = new ConvertPacket(this, packetVersion, CURVERSION); } // convert to new packet type if necessary if (convertPkt != null) - convertPkt.handleReadPacket(readpkt); - + convertPkt.handleReadPacket(readpkt); + } catch (IllegalArgumentException ex) { handleIllegalArgumentExceptionPacket(readpkt, ex); throw ex; - + } catch (OutOfMemoryError ex) { // Dump header to help with debugging (i.e. was it an // unusually large packet? Corrupted read?) @@ -1252,17 +1267,17 @@ public int readData() Globals.getBrokerResources().getKString( BrokerResources.M_LOW_MEMORY_READALLOC) + ": " + readpkt.headerToString()); - - // re-read the packet ... + + // re-read the packet ... // in 99.??? % of the time, we just lost a packet - // + // // if we fail a second time or get an unexpected // error ... its fatal for the connection boolean OK = readInPacket(readpkt); if (!OK) { // we didnt finish reading return Operation.PROCESS_WRITE_INCOMPLETE; } - + } catch (StreamCorruptedException ex) { String connStr = getRemoteConnectionString(); logger.logStack(Logger.WARNING, @@ -1285,7 +1300,7 @@ public int readData() readpkt = clearReadPacket(readpkt); return Operation.PROCESS_PACKETS_REMAINING; } - + if (Globals.getConnectionManager().PING_ENABLED) { updateAccessTime(true); } @@ -1293,16 +1308,16 @@ public int readData() countInPacket(readpkt); } } - + if (IMQBasicConnection.DEBUG || DUMP_PACKET || IN_DUMP_PACKET) { int flag = (DUMP_PACKET || IN_DUMP_PACKET) ? Logger.INFO : Logger.DEBUGHIGH; logger.log(flag, "\n------------------------------" + "\nReceived incoming Packet - Dumping" - + "\nConnection: " + this + + "\nConnection: " + this + "\n------------------------------" - + "\n" + readpkt.dumpPacketString(">>>>****") + + "\n" + readpkt.dumpPacketString(">>>>****") + "\n------------------------------"); } @@ -1325,13 +1340,13 @@ public int readData() router.handleMessage(this, readpkt); readpkt = clearReadPacket(readpkt); } catch (OutOfMemoryError ex) { - logger.logStack(Logger.ERROR, - BrokerResources.E_LOW_MEMORY_FAILED, + logger.logStack(Logger.ERROR, + BrokerResources.E_LOW_MEMORY_FAILED, ex); throw ex; } - } finally { + } finally { setCritical(false); } return Operation.PROCESS_PACKETS_REMAINING; @@ -1341,7 +1356,7 @@ public int readData() inReadProcess = false; } - } + } // ------------------------------------------------------------------------- @@ -1404,7 +1419,7 @@ public int writeData(boolean wait) inWriteProcess = true; if (IMQBasicConnection.DEBUG) { - logger.log(Logger.DEBUG, + logger.log(Logger.DEBUG, "Writing IMQIPConnection {0} ", this.toString()); } @@ -1421,9 +1436,9 @@ public int writeData(boolean wait) } if (!isBusy()) { return Operation.PROCESS_PACKETS_COMPLETE; - } + } + - flushCritical = true; if ((!inCtrlWrite) && (!inJMSWrite)) { @@ -1448,7 +1463,7 @@ public int writeData(boolean wait) } // first convert it if (convertPkt != null) - convertPkt.handleWritePacket(ctrlpkt); + convertPkt.handleWritePacket(ctrlpkt); if (IMQBasicConnection.DEBUG || DUMP_PACKET || OUT_DUMP_PACKET) { dumpControlPacket(ctrlpkt); } @@ -1490,20 +1505,20 @@ public int writeData(boolean wait) return Operation.PROCESS_PACKETS_COMPLETE; } - } + } // the broker is no longer in a critical state flushCritical = false; // OK .. now try normal messages - + if (IMQBasicConnection.DEBUG) { - logger.log(Logger.DEBUGHIGH, + logger.log(Logger.DEBUGHIGH, "IMQIPConnection[ {0} ] - processing " + " normal msg queue", this.toString()); } - + // we shouldnt be here if we are paused or waiting for a resume @@ -1535,14 +1550,14 @@ public int writeData(boolean wait) // restart the resume flow // LKS - XXX - convertPkt.handleWritePacket(waitingWritePkt); + convertPkt.handleWritePacket(waitingWritePkt); } // check for connection flow control sent_count ++; - boolean aboutToWaitForRF = flowCount != 0 && - sent_count >= flowCount; + boolean aboutToWaitForRF = flowCount != 0 && + sent_count >= flowCount; if (aboutToWaitForRF) { sent_count = 0; @@ -1552,10 +1567,10 @@ public int writeData(boolean wait) if (IMQBasicConnection.DEBUG || DUMP_PACKET || OUT_DUMP_PACKET) { int flag = (DUMP_PACKET || OUT_DUMP_PACKET) ? Logger.INFO : Logger.DEBUGHIGH; - - logger.log(flag, + + logger.log(flag, "\n------------------------------" - +"\nSending JMS Packet -[block = "+BLOCKING + +"\nSending JMS Packet -[block = "+BLOCKING + ",nio = "+!STREAMS+"] " + this + " Dumping" + "\n" + waitingWritePkt.dumpPacketString("<<<<****") + "\n------------------------------"); @@ -1563,8 +1578,8 @@ public int writeData(boolean wait) } - - if (inJMSWrite) { + + if (inJMSWrite) { inJMSWrite = !writeOutPacket(waitingWritePkt); } lockCritical = false; @@ -1576,7 +1591,7 @@ public int writeData(boolean wait) if (waitingWritePkt.getPacketType() < PacketType.LAST) pktsOut[waitingWritePkt.getPacketType()] ++; - + if (Globals.getConnectionManager().PING_ENABLED) { updateAccessTime(false); } @@ -1597,21 +1612,21 @@ public int writeData(boolean wait) // connection is gone logger.log(logger.DEBUGMED, "closed connection " + this, ex); inJMSWrite = false; - destroyConnection(false, GoodbyeReason.CLIENT_CLOSED, + destroyConnection(false, GoodbyeReason.CLIENT_CLOSED, Globals.getBrokerResources().getKString( BrokerResources.M_CONNECTION_CLOSE)); - throw ex; + throw ex; } finally { inWriteProcess = false; - synchronized (flushCtrlLock) { + synchronized (flushCtrlLock) { if (flushCtrl) { - if ((ctrlpkt == null && control.isEmpty()) + if ((ctrlpkt == null && control.isEmpty()) || !isValid()) { if (IMQBasicConnection.DEBUG) { logger.log(Logger.DEBUG, - "Done flushing control messages on " + "Done flushing control messages on " + this); } flushCtrl = false; @@ -1622,11 +1637,11 @@ public int writeData(boolean wait) } } if (flush) { - synchronized (flushLock) { + synchronized (flushLock) { if (!isBusy() || !isValid()) { if (IMQBasicConnection.DEBUG) { logger.log(Logger.DEBUG, - "Done flushing control messages on " + "Done flushing control messages on " + this); } flush = false; @@ -1640,7 +1655,7 @@ public int writeData(boolean wait) waitingWritePkt = clearWritePacket(waitingWritePkt); } } - } + } assert false : " should never happen"; @@ -1652,10 +1667,10 @@ public int writeData(boolean wait) } protected void dumpControlPacket(Packet pkt) { - int loglevel = ((DUMP_PACKET || OUT_DUMP_PACKET) ? + int loglevel = ((DUMP_PACKET || OUT_DUMP_PACKET) ? Logger.INFO : Logger.DEBUGHIGH); logger.log(loglevel, "\n------------------------------" - +"\nSending Control Packet -[block = "+BLOCKING + +"\nSending Control Packet -[block = "+BLOCKING + ",nio = "+!STREAMS+"] Dumping" + "\n------------------------------" + "\n" + pkt.dumpPacketString("<<<<****") @@ -1682,9 +1697,9 @@ protected void checkState() { ((NotificationInfo)service).setReadyToWrite(this, busy); } } - } - } - + } + } + } @@ -1716,11 +1731,11 @@ protected void handleWriteException(Throwable ex) int count = 0; boolean firstpass = true; while (true) { - try { - logger.log(Logger.ERROR, + try { + logger.log(Logger.ERROR, Globals.getBrokerResources().getKString( BrokerResources.E_CLOSE_CONN_ON_OOM, this.toString())); - closeConnection(firstpass, + closeConnection(firstpass, GoodbyeReason.CON_FATAL_ERROR, ex.toString()); firstpass = false; break; @@ -1738,14 +1753,14 @@ protected void handleWriteException(Throwable ex) throw (IOException)ex; } else if (ex instanceof BrokerShutdownRuntimeException) { logger.log(Logger.INFO, ex.getMessage()); - closeConnection(true, + closeConnection(true, GoodbyeReason.SHUTDOWN_BKR, ex.toString()); - } else { + } else { logger.logStack(Logger.ERROR, "Internal Error: " + "Received unexpected exception processing connection " + " closing connection", ex); // something went wrong, close connection - closeConnection(true, + closeConnection(true, GoodbyeReason.CON_FATAL_ERROR, ex.toString()); } } @@ -1763,8 +1778,8 @@ protected void handleBigPacketException(Packet pkt, BigPacketException e) { protected void handleIllegalArgumentExceptionPacket( Packet pkt, IllegalArgumentException e) { - logger.log(Logger.ERROR, - "Bad version packet received: "+e.getMessage()+", reject connection ["+this+"]"); + logger.log(Logger.ERROR, + "Bad version packet received: "+e.getMessage()+", reject connection ["+this+"]"); // queue a HELLO_REPLY w/ error Packet reply = new Packet(useDirectBuffers()); @@ -1822,7 +1837,7 @@ protected void checkConnection() { boolean sendAck = false; if (enablePingReply && closeInterval > 0 && getClientProtocolVersion() >= PacketType.VERSION364 ) { sendAck = true; - + // see if we need to kill the connection // get access time long access = getLastResponseTime(); @@ -1834,9 +1849,9 @@ protected void checkConnection() { // if is, kill the connection if (delta >= interval) { logger.log(Logger.INFO, BrokerResources.W_UNRESPONSIVE_CONNECTION, - String.valueOf(this.getConnectionUID().longValue()), + String.valueOf(this.getConnectionUID().longValue()), String.valueOf(delta/1000)); - + destroyConnection(false,GoodbyeReason.ADMIN_KILLED_CON, "Connection unresponsive"); } @@ -1868,8 +1883,8 @@ public void updateMemory(int cnt, long memory, long max) { sendResume(cnt, memory, max, true); } - protected void sendResume(int cnt, long memory, - long max, boolean priority) + protected void sendResume(int cnt, long memory, + long max, boolean priority) { if (packetVersion < Packet.VERSION1) return; // older protocol cant handle resume @@ -1891,11 +1906,11 @@ protected void sendResume(int cnt, long memory, /** * called when either the session or the - * control message is busy + * control message is busy */ public void eventOccured(EventType type, Reason r, - Object target, Object oldval, Object newval, - Object userdata) + Object target, Object oldval, Object newval, + Object userdata) { // LKS - at this point, we are in a write lock @@ -1904,18 +1919,18 @@ public void eventOccured(EventType type, Reason r, synchronized (stateLock) { if (type == EventType.EMPTY) { - + // this can only be from the control queue assert target == control; assert newval instanceof Boolean; - - } else if (type == + + } else if (type == EventType.BUSY_STATE_CHANGED) { assert target instanceof Session; assert newval instanceof Boolean; - + Session s = (Session) target; - + synchronized(busySessions) { synchronized (s.getBusyLock()) { if (s.isBusy()) { @@ -1923,22 +1938,22 @@ public void eventOccured(EventType type, Reason r, } } } - + } checkState(); } } - private boolean fillNextPacket(Packet p) + private boolean fillNextPacket(Packet p) { Session s = null; - + synchronized(busySessions) { Iterator itr = busySessions.iterator(); while (itr.hasNext()) { s = (Session)itr.next(); itr.remove(); - if (s == null) + if (s == null) continue; synchronized (s.getBusyLock()) { if (s.isBusy()) { @@ -1946,7 +1961,7 @@ private boolean fillNextPacket(Packet p) break; } } - + } }