公司運維同事針對ActiveMQ提出了兩個問題,其中一個是「隊列長時間無人監聽時,自動刪除該隊列」。
調研提出了三種方案。這裏是相關記錄和說明。java
運維同事對生產環境使用的ActiveMQ作了相關監控。這個監控在某個隊列出現消息積壓時(實際規則更復雜一些,而且正在調整)發送短信報警。運維接到短信後會通知開發負責人。開發負責人再檢查系統是否在正常監聽相關隊列。
可是,從過往經驗來看,只有一次消息積壓是業務系統故障致使的;其它狀況(沒有統計到具體數據,大約五六次)都是業務系統已經再也不監聽該隊列致使的。這使得咱們的運維、開發同事半夜三更火急火燎檢查問題,結果發現只須要刪除那個隊列就能夠了。
尤爲惹發起牀氣的是,因爲線上ActiveMQ配置了消息持久化,這種消息積壓其實並不會對ActiveMQ產生多大的影響,徹底能夠在次日上班後再處理。
考慮到你們的睡眠質量和夫妻感情,在JIRA中,咱們調研、討論了三個方案。 spring
在ActiveMQ官方提供的功能列表中,有這樣一項功能:Delete Inactive Destination。它能夠刪除「沒有未處理消息、而且沒有消費者的Destination」。 apache
這個配置比較簡單,在ActiveMQ的配置文件activemq.xml中,作以下改動便可。這裏示例的是對queue的配置;topic配置是相似的。 運維
<!-- 在這裏加上schedulePeriodForDestinationPurge屬性。 --> <broker xmlns="http://activemq.apache.org/schema/core" schedulePeriodForDestinationPurge="10000" <destinationPolicy> <policyMap> <policyEntries> <!-- 在這裏加上gcInactiveDestinations和inactiveTimoutBeforeGC兩個屬性 --> <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/> </policyEntries> </policyMap> </destinationPolicy> </broker>
上述示例配置的含義是:這個Broker會每隔10000ms(由schedulePeriodForDestinationPurge配置指定)掃描一次標記有「gcInactiveDestinations="true"」的Queue(因爲這裏配置的是queue=">",於是實際是掃描全部Queue),將其中「沒有未處理消息、而且沒有消費者、而且此狀態已超過30000ms(由inactiveTimoutBeforeGC配置指定)」的隊列刪除掉。有點暈。各配置項的具體說明以下。ide
如下三個配置項中,schedulePeriodForDestinationPurge和gcInactiveDestinations是必填配置;inactiveTimoutBeforeGC是選填配置。spring-boot
這是針對Broker的配置,用於聲明「掃描閒置隊列的週期」,單位爲毫秒。默認值爲0,意爲「不掃描」。
須要說明的是,這裏只能配置掃描任務的啓動週期、不能配置啓動延遲。也就是說,配置好了以後,ActiveMQ服務啓動時會當即掃描一次;而後再按照指定時間週期性掃描。this
這是針對Destination的配置,用於聲明當Broker掃描閒置隊列時,是否掃描這個Destination(由queue="xxxx"來指定)。默認值是false。 spa
這也是針對Destination的配置,用於聲明這個Destination閒置多長時間後能夠被刪除。單位毫秒,默認時間60s。
這個配置必須在gcInactiveDestinations被設置爲true的狀況下才會生效。插件
雖然上面介紹了這麼多,但實際上,從第一句話中就能夠看出這個方案沒法解決咱們的問題。由於咱們的問題是要處理「有消息積壓、但沒有消費者的Destination」,而這個方案只能刪除「沒有未處理消息、而且沒有消費者的Destination」。
除此以外,這應該算是最簡單可靠的一種方案了。實際上,對大多數原生Queue來講,業務系統會同時下線其生產者與消費者。這個方案能夠很好的應對這種狀況。線程
ActiveMQ插件(plugin),也有文檔中稱爲攔截器(Interceptor)。兩者實際上是相輔相成的:配置時,咱們須要一個插件;執行時,咱們須要一個攔截器。
ActiveMQ官方提供了幾個插件(日誌、統計、時間戳等),能夠參見官方說明和開發文檔。咱們能夠參考官方示例來自定義一個插件。
ActiveMQ經過解析activemq.xml中的配置,來加載一個插件的。所以咱們從配置入手,逐步搞清楚插件和攔截器是如何工做的。
activemq.xml中的配置其實很簡單,以下所示:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" advisorySupport="false"> <!-- 用plugins標籤聲明這是一個插件 --> <plugins> <!-- bea的語法來自spring,xml name space的聲明已經說明了這一點 --> <bean xmlns="http://www.springframework.org/schema/beans" id="linjunPlugin" class="net.loyintean.blog.jms.manage.PlugIn"/> </plugins> <!-- 其它配置 --> </broker>
上述配置聲明瞭一個插件,插件類名是net.loyintean.blog.jms.manage.PlugIn,id是linjunPlugin。
這個類必須包含在ActiveMQ的classpath路徑下。咱們能夠本身打一個jar包,並把jar包放到ActiveMQ的lib路徑下;也能夠修改相關類路徑。總之要保證ActiveMQ可以加載到這個類(及其依賴類)。
其實按照上面的配置,並不須要爲插件配置一個id。不過,插件聲明還有其它方式,有些是須要使用id的。這裏很少說,能夠參考開發文檔。
如配置中的註釋所說,聲明插件所使用的<bean />標籤及語法來自spring。也就是說,spring中的<property />等其它標籤,這裏也是支持的。不過目前尚未找到對@Autowired等註解的支持方式。
因爲我使用的是spring boot,只須要加上一個spring-boot-starter-activemq就能夠引入所需依賴jar包了。不使用spring boot的話,須要引入activemq-broker-x.y.z.jar。
根據ActiveMQ規範,插件必須實現BrokerPlugin接口。這個接口只有一個方法:Broker installPlugin(Broker broker) throws Exception,用於在服務啓動、加載插件時,獲取當前啓動的borker實例,並返回一個Broker實例。
例如,上文中聲明的linjunPlugin代碼以下:
public class PlugIn implements BrokerPlugin { /** * @author linjun * @since 2017年10月30日 * @param b * @return * @see org.apache.activemq.broker.BrokerPlugin#installPlugin(org.apache.activemq.broker.Broker) */ @Override public Broker installPlugin(Broker b) { return new RemoveDestination(b); } }
彷佛有些莫名其妙,但從「裝飾者」的角度來理解就輕鬆愉快了:入參broker是原生實例(固然也多是其它插件「裝飾」過的);出參則是被咱們本身的插件「裝飾」過的、加強版的實例。
通常來講,啓動過程不會作太多處理;處理邏輯在咱們的「裝飾者」中——如上面代碼裏的RemoveDestination。
如上文所說,咱們須要提供的是一個「裝飾」過的Broker。可是Broker是一個接口,其中有超過50個方法,用於處理Broker在服務期間的各類事件(如服務啓動、建立連接、消息收發、事務提交與回滾等等)。直接實現接口未免太醜陋了。ActiveMQ也考慮到了這一點,所以給咱們提供了一個適配器(其實同時也是一個裝飾者):BrokerFilter。它的代碼以下:
public class BrokerFilter implements Broker { // 被「裝飾」的原生實例 protected final Broker next; public BrokerFilter(Broker next) { this.next = next; } // 省略其它接口方法,所有都直接委託給next處理。 }
藉助這個適配器,咱們能夠專一的處理咱們關注的事件。如咱們的RemoveDestination,它只須要在服務啓動時註冊一個定時器,按需刪除無人監聽的隊列便可。代碼以下:
public class RemoveDestination extends BrokerFilter { private Timer timer; /** * @param next */ public RemoveDestination(Broker next) { super(next); // 聲明爲守護線程,避免它阻塞關閉activeMQ的進程 this.timer = new Timer(true); } @Override public void start() throws Exception { super.start(); // DONE linjun 2017-11-01 改成定時調度 this.timer.schedule(new TimerTask() { @Override public void run() { RemoveDestination.this.remove(); } }, 3000, 3000); } private void remove() { Map<ActiveMQDestination, Destination> destinationMap = this .getDestinationMap(); ConnectionContext context = BrokerSupport.getConnectionContext(this); destinationMap.entrySet().forEach(entry -> { Destination destination = entry.getValue(); // 無人監聽了 // DONE linjun 2017-11-01 只處理queue,不處理topic if (destination.getDestinationStatistics().getConsumers() .getCount() == 0) { ActiveMQDestination activeMQDestination = entry.getKey(); if (activeMQDestination.isQueue()) { try { this.removeDestination(context, activeMQDestination, 1); } catch (Exception e) { // 示例代碼,不要噴我直接打印堆棧 e.printStackTrace(); } } } }); } }
除了BrokerFilter這個針對Broker事件作攔截、裝飾的類以外,也有針對Destination的DestinationFilter,不贅述。
不管是BrokerFilter仍是DestinationFilter,在重寫父類的某個方法時,要注意調用super中的對應方法。如RemoveDestination類在覆蓋start()方法時,調用了super.start()方法。
這兩個類中的每個方法,都對應Broker或Destination的一個事件的「處理棧」。若是不調用父類方法,極可能會致使一些基礎的、或關鍵的代碼沒有執行到,進而出現異常。所以,若是不是很是肯定「執行到這裏時必須中斷當前事件」,不然必定要調用super相應方法。
上面的代碼是示例用,還能夠進一步完善。可是這個方案是能夠知足需求的。
不過,這個方案存在一項風險:當咱們刪除一個Destination時,其中全部未消費的消息也會隨之被刪除,即便這些消息已經作了持久化。若是有某個業務系統長時間出現故障、沒法連上ActiveMQ,而ActiveMQ在此期間刪除了它監聽的Destination及其中消息……這個風險機率雖然小,可是影響太大。慎重起見,放棄方案二。
方案三屬於運維的範疇。如JIRA中所討論的,這個問題真正的「痛點」,並非廢棄隊列,而是非緊急狀況卻在半夜報警。所以,由運維同事修改一下腳本,調整「沒有消費者」這種問題的監控報警時間就能夠了。
最後選定的是方案三。方案一不能知足需求;方案二的風險較大。方案三直擊痛點,乾脆利落。這件事也啓示咱們:作事情以前先想清楚目標,謀定然後動。