diff --git a/quickfixj-core/src/main/java/quickfix/Initiator.java b/quickfixj-core/src/main/java/quickfix/Initiator.java index 8eabeee725..50e9ad08f5 100644 --- a/quickfixj-core/src/main/java/quickfix/Initiator.java +++ b/quickfixj-core/src/main/java/quickfix/Initiator.java @@ -130,5 +130,11 @@ public interface Initiator extends Connector { * AbstractSocketInitiator.createDynamicSession is called */ String SETTING_DYNAMIC_SESSION = "DynamicSession"; - + /** + * Initiator setting for reconnect attempts. Only valid when + * session connection type is "initiator". + * + * @see quickfix.SessionFactory#SETTING_CONNECTION_TYPE + */ + String SETTING_RECONNECT_ATTEMPT = "ReconnectAttempt"; } diff --git a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java index e276d4b8b9..7b3e0bc549 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java @@ -150,6 +150,7 @@ private void createInitiator(final Session session, final boolean continueInitOn String proxyDomain = null; int proxyPort = -1; + int retryCount = 1; if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_TYPE)) { proxyType = settings.getString(sessionID, Initiator.SETTING_PROXY_TYPE); @@ -173,6 +174,9 @@ && getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) { proxyHost = settings.getString(sessionID, Initiator.SETTING_PROXY_HOST); proxyPort = (int) settings.getLong(sessionID, Initiator.SETTING_PROXY_PORT); } + if (getSettings().isSetting(sessionID, Initiator.SETTING_RECONNECT_ATTEMPT)){ + retryCount = settings.getInt(sessionID, Initiator.SETTING_RECONNECT_ATTEMPT); + } ScheduledExecutorService scheduledExecutorService = (scheduledReconnectExecutor != null ? scheduledReconnectExecutor : getScheduledExecutorService()); try { @@ -180,7 +184,7 @@ && getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) { socketAddresses, localAddress, connectTimeout, reconnectingIntervals, scheduledExecutorService, settings, networkingOptions, getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig, - proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation); + proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation, retryCount); initiators.add(ioSessionInitiator); } catch (ConfigError e) { diff --git a/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java b/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java index 96436e0837..8f5b02a7cc 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java @@ -70,7 +70,7 @@ public IoSessionInitiator(Session fixSession, SocketAddress[] socketAddresses, EventHandlingStrategy eventHandlingStrategy, IoFilterChainBuilder userIoFilterChainBuilder, boolean sslEnabled, SSLConfig sslConfig, String proxyType, String proxyVersion, String proxyHost, int proxyPort, - String proxyUser, String proxyPassword, String proxyDomain, String proxyWorkstation) throws ConfigError { + String proxyUser, String proxyPassword, String proxyDomain, String proxyWorkstation, int retryCount) throws ConfigError { this.executor = executor; final long connectTimeoutMillis = connectTimeout * 1000L; @@ -83,7 +83,7 @@ public IoSessionInitiator(Session fixSession, SocketAddress[] socketAddresses, reconnectTask = new ConnectTask(sslEnabled, socketAddresses, localAddress, userIoFilterChainBuilder, fixSession, connectTimeoutMillis, reconnectIntervalInMillis, sessionSettings, networkingOptions, eventHandlingStrategy, sslConfig, - proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation, log); + proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation, log, retryCount); } catch (GeneralSecurityException e) { throw new ConfigError(e); } @@ -111,6 +111,9 @@ private static class ConnectTask implements Runnable { private long lastConnectTime; private int nextSocketAddressIndex; private int connectionFailureCount; + private int retryCount = 1; + private int retryAttempt = 0; + private boolean isFirstTime = true; private ConnectFuture connectFuture; private final String proxyType; @@ -128,7 +131,7 @@ public ConnectTask(boolean sslEnabled, SocketAddress[] socketAddresses, SessionSettings sessionSettings, NetworkingOptions networkingOptions, EventHandlingStrategy eventHandlingStrategy, SSLConfig sslConfig, String proxyType, String proxyVersion, String proxyHost, int proxyPort, String proxyUser, String proxyPassword, String proxyDomain, - String proxyWorkstation, Logger log) throws ConfigError, GeneralSecurityException { + String proxyWorkstation, Logger log, int retryCount) throws ConfigError, GeneralSecurityException { this.sslEnabled = sslEnabled; this.socketAddresses = socketAddresses; this.localAddress = localAddress; @@ -150,6 +153,7 @@ public ConnectTask(boolean sslEnabled, SocketAddress[] socketAddresses, this.proxyPassword = proxyPassword; this.proxyDomain = proxyDomain; this.proxyWorkstation = proxyWorkstation; + this.retryCount = retryCount; setupIoConnector(); } @@ -224,7 +228,14 @@ public void run() { private void connect() { try { lastReconnectAttemptTime = SystemTime.currentTimeMillis(); - SocketAddress nextSocketAddress = getNextSocketAddress(); + SocketAddress nextSocketAddress = socketAddresses[getCurrentSocketAddressIndex()]; + if (retryCount == 1 || retryAttempt == retryCount || isFirstTime){ + nextSocketAddress = getNextSocketAddress(); + retryAttempt = 1; + isFirstTime = false; + } else { + ++retryAttempt; + } if (localAddress == null) { connectFuture = ioConnector.connect(nextSocketAddress); } else {