使用Spring Jms鏈接ActiveMQ 重連問題的一點心得

咱們如今作的Web項目遇到一個問題,項目啓動時使用Failover方式鏈接某個ActiveMQ,brokerUrl相似java

failOver:(tcp://xxx.xxx.xxx.xxx:61616),若是ActiveMQ啓動正常,項目啓動正常;spring

但若是ActiveMQ沒有啓動,項目將沒法啓動完成,一直卡在鏈接ActiveMQ階段。服務器

通過調查,發現是ActiveMQ提供的FailOver Transport方式致使的。tcp

FailOver Transport有maxReconnectAttempts和startupMaxReconnectAttempts兩個鏈接參數。ide

    前者是FailOver Transport最大的重連次數,後者是啓動階段的最大重連次數,因爲咱們原先的鏈接這兩個參數沒有設置值,它們都沿用默認值-1,表示無限次重連,直到鏈接成功爲止。這樣,只要咱們在啓動項目時不啓動ActiveMQ,FailOver Transport將一直嘗試重連,致使後續程序加載沒法進行,從而項目啓動卡住。ui

    找到了緣由以後,咱們在brokerUrl中設置startupMaxReconnectAttempts=2,形如:this

    failOver:(tcp://xxx.xxx.xxx.xxx:61616)?startupMaxReconnectAttempts=2,這樣啓動時FailOverTransport僅僅重試2次,若是不成功,就再也不嘗試,繼續加載後續程序,這樣就解決了項目啓動卡住的問題。代理

    然而還有一個問題沒有解決,若是啓動時沒有鏈接ActiveMQ成功,在項目啓動完成後,再啓動ActiveMQ時,即便ActiveMQConnectionFactory一直重連ActiveMQ,直到鏈接成功。咱們配置的DefaultMessageListenerContainer仍然會拋出如下異常:日誌

ERROR DefaultMessageListenerContainer - Could not refresh JMS Connection for destination 'queue://xxxxxxx' - retrying in 5000 ms. Cause: The JMS connection has failed: Connection refused: connect.code

   在StackOverflow上有人給出瞭解答,是由於DefaultMessageListenerContainer引用的

SingleConnectionFactory對象的reconnectOnException屬性。

(http://stackoverflow.com/questions/17729066/spring-mq-jms-reconnect-configuration)

   這個屬性的說明以下(基於Spring Framework 4.3.2 RELEASE版本)

   Specify whether the single Connection should be reset (to be subsequently renewed) when a JMSException is reported by the underlying Connection.

  這個屬性指定是否在底層的鏈接(這裏是指ActiveMQ鏈接)拋出JMSException的時候,對鏈接進行重置。

  它的默認值是false,意味着即便JMS異常拋出,SingleConnectionFactory自帶的connection沒有重置。

在調用getConnection()方法時,因爲鏈接沒有重置,this.connection != null,仍然返回舊的connection.

從而DefaultMessageListenerContainer引用的鏈接仍然失敗鏈接,不斷拋出異常

protected Connection getConnection() throws JMSException {
		synchronized (this.connectionMonitor) {
			if (this.connection == null) {
				initConnection();
			}
			return this.connection;
		}
	}

   當咱們設置這個屬性爲true後,在拋出Jms異常時,SingleConnectionFactory對象綁定的ExceptionListner代理對象會回調SingleConnectionFactory對象的onException方法,將connection屬性設置爲null,這樣在getConnection()方法時,會調用initConnection方法建立新的鏈接。這樣就能保證ActiveMQ啓動成功後,會有對應的成功鏈接被建立,從而使DefaultMessageListenerContainer引用到正常鏈接,再也不拋出異常。

public class SingleConnectionFactory ......
{

    @Override
	public void onException(JMSException ex) {
		logger.warn("Encountered a JMSException - resetting the underlying JMS Connection", ex);
		resetConnection();
	}

    public void resetConnection() {
		synchronized (this.connectionMonitor) {
			if (this.connection != null) {
				closeConnection(this.connection);
			}
			this.connection = null;
		}
	}

    protected Connection getConnection() throws JMSException {
		synchronized (this.connectionMonitor) {
			if (this.connection == null) {
				initConnection();
			}
			return this.connection;
		}
	}

    public void initConnection() throws JMSException {
		if (getTargetConnectionFactory() == null) {
			throw new IllegalStateException(
					"'targetConnectionFactory' is required for lazily initializing a Connection");
		}
		synchronized (this.connectionMonitor) {
			if (this.connection != null) {
				closeConnection(this.connection);
			}
			this.connection = doCreateConnection();
			prepareConnection(this.connection);
			if (this.startedCount > 0) {
				this.connection.start();
			}
			if (logger.isInfoEnabled()) {
				logger.info("Established shared JMS Connection: " + this.connection);
			}
		}
	}
}

private class AggregatedExceptionListener implements ExceptionListener {

		final Set<ExceptionListener> delegates = new LinkedHashSet<ExceptionListener>(2);

		@Override
		public void onException(JMSException ex) {
			synchronized (connectionMonitor) {
				// Iterate over temporary copy in order to avoid ConcurrentModificationException,
				// since listener invocations may in turn trigger registration of listeners...
				for (ExceptionListener listener : new LinkedHashSet<ExceptionListener>(this.delegates)) {
					listener.onException(ex);
				}
			}
		}
	}

   上述代碼基於Spring Framework 4.3.3.RELEASE版本

 具體的配置信息以下:

<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        ...................
        <property name="reconnectOnException" value="true"/> 
</bean>

     上面的修改方案雖然能夠解決ActiveMQ服務器未啓動時應用啓動卡住的問題,可是每次重置JMS鏈接時都會輸出異常信息,這是由於重置鏈接時會將拋出的異常信息輸出到日誌。     

package org.springframework.jms.connection;

public class SingleConnectionFactory implements ConnectionFactory, QueueConnectionFactory,
		TopicConnectionFactory, ExceptionListener, InitializingBean, DisposableBean {
.............
public void onException(JMSException ex) {
		logger.warn("Encountered a JMSException - resetting the underlying JMS Connection", ex);
		resetConnection();
	}
.............
}

       若是不想查看重置鏈接時拋出的異常信息,能夠在日誌配置文件中設置日誌輸出級別爲ERROR(以logback爲例) 

<logger name="org.springframework.jms.connection" level="ERROR" />

       

       此外在ActiveMQ沒有啓動的狀況下,若是在項目中爲ActiveMQ消息隊列配置了DefaultMessageListenerContainer對象,它將會持續刷新JMS鏈接,輸出異常信息,默認的刷新間隔是由DefaultMessageListenerContainer對象的backOff屬性對象的interval屬性肯定的

public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer {
...........
/**
	 * The default recovery interval: 5000 ms = 5 seconds.
*/
public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, Long.MAX_VALUE);

public void setRecoveryInterval(long recoveryInterval) {
		this.backOff = new FixedBackOff(recoveryInterval, Long.MAX_VALUE);
}
...........
}

public class FixedBackOff implements BackOff {

private long interval = 5000L;
........
public FixedBackOff(long interval, long maxAttempts) {
        this.interval = interval;
        this.maxAttempts = maxAttempts;
    }
........
}

    若是在定義DefaultMessageListenerContainer對象時沒有設定recoveryInterval,那它將使用默認的5000ms初始化backoff對象。實際運行時DefaultMessageListenerContainer對象將使用5000ms(5s)這個時間間隔刷新JMS鏈接,輸出鏈接異常信息。若是定義的DefaultMessageListenerContainer對象數目不少,日誌中的鏈接異常信息量將會很大,咱們須要調整這個時間間隔。
    能夠在定義DefaultMessageListenerContainer對象時設置recoveryInterval(下面的例子將recoveryInterval設置爲30000ms,你們能夠根據本身的需求進行調整。) 

<bean id="xxxContainer" 
        class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
        ..................
		<property name="recoveryInterval" value="30000" />
</bean>
相關文章
相關標籤/搜索