消息隊列(基礎篇)

成小胖學習ActiveMQ·基礎篇

過了個春節,回到公司的成小胖變成了成大胖。可是大家千萬別覺得他那個大肚子裏面裝的都是肥肉,裏面的墨水也多了很多嘞,畢竟成小胖利用春節的半個月時間專心學習並研究了 ActiveMQ,嘿嘿……
這不,爲了檢驗下本身的學習成果,上班的第一天成小胖就去找架構師老王交流 ActiveMQ 相關的知識,還順便向老王討了個紅包,可把成小胖給高興壞了。
來,根據你的瞭解說下 ActiveMQ 是什麼。」html

「這個簡單,ActiveMQ 是一個 MOM,具體來講是一個實現了 JMS 規範的系統間遠程通訊的消息代理。它……」mysql

等等,先解釋下什麼是 MOM。」linux

「好。MOM 就是面向消息中間件(Message-oriented middleware),是用於以分佈式應用或系統中的異步、鬆耦合、可靠、可擴展和安全通訊的一類軟件。MOM 的整體思想是它做爲消息發送器和消息接收器之間的消息中介,這種中介提供了一個全新水平的鬆耦合。」sql

JMS呢?數據庫

成小胖是個追求極致的人,爲了解釋得更通俗易懂,索性搬來一塊白板邊畫邊說。apache

「JMS 叫作 Java 消息服務(Java Message Service),是 Java 平臺上有關面向 MOM 的技術規範,旨在經過提供標準的產生、發送、接收和處理消息的 API 簡化企業應用的開發,相似於 JDBC 和關係型數據庫通訊方式的抽象。」安全


嗯,很好。下面的這些概念你也須要特別理解下網絡

  • Provider:純 Java 語言編寫的 JMS 接口實現(好比 ActiveMQ 就是)
  • Domains:消息傳遞方式,包括點對點(P2P)、發佈/訂閱(Pub/Sub)兩種
  • Connection factory:客戶端使用鏈接工廠來建立與 JMS provider 的鏈接
  • Destination:消息被尋址、發送以及接收的對象

你來講說這其中 P2P 和 Pub/Sub 的區別吧」,老王給成小胖拋出了一個問題。session

成小胖可不是吃素的,畢竟要是吃素的話他也吃不到這麼胖……這些基本概念對他來講都是小事一樁:架構

P2P (點對點)消息域使用 queue 做爲 Destination,消息能夠被同步或異步的發送和接收,每一個消息只會給一個 Consumer 傳送一次。

Consumer 可使用 MessageConsumer.receive() 同步地接收消息,也能夠經過使用MessageConsumer.setMessageListener() 註冊一個 MessageListener 實現異步接收。

多個 Consumer 能夠註冊到同一個 queue 上,但一個消息只能被一個 Consumer 所接收,而後由該 Consumer 來確認消息。而且在這種狀況下,Provider 對全部註冊的 Consumer 以輪詢的方式發送消息。

Pub/Sub(發佈/訂閱,Publish/Subscribe)消息域使用 topic 做爲 Destination,發佈者向 topic 發送消息,訂閱者註冊接收來自 topic 的消息。發送到 topic 的任何消息都將自動傳遞給全部訂閱者。接收方式(同步和異步)與 P2P 域相同。
除非顯式指定,不然 topic 不會爲訂閱者保留消息。固然,這能夠經過持久化(Durable)訂閱來實現消息的保存。這種狀況下,當訂閱者與 Provider 斷開時,Provider 會爲它存儲消息。當持久化訂閱者從新鏈接時,將會受到全部的斷連期間未消費的消息。

「嗯,總結的很不錯,上面的這些知識是學習 ActiveMQ 的理論基礎,是必需要掌握的。」

「既然 JMS 是一個通用的規範,那麼使用它建立應用程序確定也有一個通用的步驟吧?」老王追問道。

「有的有的。要不您來講說這個通用步驟?就當我考考您,哈哈!」成小胖故做聰明,自覺得老王做爲架構師不會關注這些太具體的實現細節。

然而老王平日裏親力親爲,至今還經常擼代碼,怎麼會被這種小 case 所難倒?因而老王分分鐘給出答案:

  • 獲取鏈接工廠
  • 使用鏈接工廠建立鏈接
  • 啓動鏈接
  • 從鏈接建立會話
  • 獲取 Destination
  • 建立 Producer,或
    • 建立 Producer
    • 建立 message
  • 建立 Consumer,或發送或接收message發送或接收 message
    • 建立 Consumer
    • 註冊消息監聽器(可選)
  • 發送或接收 message
  • 關閉資源(connection, session, producer, consumer 等)

「66666,厲害啊個人王哥!」成小胖的小聰明被老王擊得粉碎!

「你嘴皮子耍夠了吧,仍是多動動手吧。如今你手寫上面步驟對應的代碼實現吧」,老王給了成小胖一個眼神,讓他本身慢慢體會……

成小胖也不是省油的燈,立刻擦乾淨白板,現場擼了起來(是擼代碼,擼代碼,擼代碼,重要的事情說三遍):

複製代碼

public class JMSDemo {
        ConnectionFactory connectionFactory;
        Connection connection;
        Session session;
        Destination destination;
        MessageProducer producer;
        MessageConsumer consumer;
        Message message;
        boolean useTransaction = false;
        try {
                Context ctx = new InitialContext();
                connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactoryName");
                //使用ActiveMQ時:connectionFactory = new ActiveMQConnectionFactory(user, password, getOptimizeBrokerUrl(broker));
                connection = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue("TEST.QUEUE");
                //生產者發送消息
                producer = session.createProducer(destination);
                message = session.createTextMessage("this is a test");

                //消費者同步接收
                consumer = session.createConsumer(destination);
                message = (TextMessage) consumer.receive(1000);
                System.out.println("Received message: " + message);
                //消費者異步接收
                consumer.setMessageListener(new MessageListener() {
                        @Override
                        public void onMessage(Message message) {
                                if (message != null) {
                                        doMessageEvent(message);
                                }
                        }
                });
        } catch (JMSException e) {
                ...
        } finally {
                producer.close();
                session.close();
                connection.close();
        }
}

複製代碼

老王滿意的點點頭:「還算不賴哈~ JMS 通用的規範我們都聊完了,下面就來聊點 ActiveMQ 更具體點的東西咯。」

「好啊好啊。要不我先基於本身的學習講講 ActiveMQ 的存儲,您看看我哪裏講的不對或者遺漏的,可好?」成小胖發揮了他一向的積極主動的做風,固然心裏裏仍是想獲得老王的讚許。

「行,那就開始吧。」

ActiveMQ 在 queue 中存儲 Message 時,採用先進先出順序(FIFO)存儲。同一時間一個消息被分派給單個消費者,且只有當 Message 被消費並確認時,它才能從存儲中刪除。

對於持久化訂閱者來講,每一個消費者得到 Message 的副本。爲了節省存儲空間,Provider 僅存儲消息的一個副本。持久化訂閱者維護了指向下一個 Message 的指針,並將其副本分派給消費者。以這種方式實現消息存儲,由於每一個持久化訂閱者可能以不一樣的速率消費 Message,或者它們可能不是所有同時運行。此外,因每一個 Message 可能存在多個消費者,因此在它被成功地傳遞給全部持久化訂閱者以前,不能從存儲中刪除。

很好,上面這段知識很是重要。其實咱們能夠經過表格來更清晰地展現」,老王補充道,並在白板上畫了如下表格:

消息類型 是否持久化 是否有Durable訂閱者 消費者延遲啓動時,消息是否保留 Broker重啓時,消息是否保留
Queue N - Y N
Queue Y - Y Y
Topic N N N N
Topic N Y Y N
Topic Y N N N
Topic Y Y Y Y

成小胖雖然對以上特性作過實踐對比,可是並無想到去畫一個表格出來使對比更加清晰易懂。特別是當他看到老王隨時就畫出這個表格時便驚歎不已,大聲喊道:「老王你太牛了,真是愛死你了!」

周圍的同事聽到後,都齊刷刷的往這邊看過來。

此情此景,老王也很差意思了:「誒誒誒,說話注意哈,不要讓人以爲咱們在搞基。迴歸正題,你再說說 ActiveMQ 經常使用的存儲方式吧。

成小胖羞澀的點點頭,迅速地迴歸原態,一五一十地提及來。

1.KahaDB

ActiveMQ 5.3 版本起的默認存儲方式。KahaDB存儲是一個基於文件的快速存儲消息,設計目標是易於使用且儘量快。它使用基於文件的消息數據庫意味着沒有第三方數據庫的先決條件。

要啓用 KahaDB 存儲,須要在 activemq.xml 中進行如下配置:

複製代碼

<broker brokerName="broker" persistent="true" useShutdownHook="false">
        <persistenceAdapter>
                <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>
        </persistenceAdapter>
</broker>

複製代碼

2.AMQ

與 KahaDB 存儲同樣,AMQ存儲使用戶可以快速啓動和運行,由於它不依賴於第三方數據庫。AMQ 消息存儲庫是可靠持久性和高性能索引的事務日誌組合,當消息吞吐量是應用程序的主要需求時,該存儲是最佳選擇。但由於它爲每一個索引使用兩個分開的文件,而且每一個 Destination 都有一個索引,因此當你打算在代理中使用數千個隊列的時候,不該該使用它。

複製代碼

<persistenceAdapter>
        <amqPersistenceAdapter
                directory="${activemq.data}/kahadb"
                syncOnWrite="true"
                indexPageSize="16kb"
                indexMaxBinSize="100"
                maxFileLength="10mb" />
</persistenceAdapter>

複製代碼

 3.JDBC

選擇關係型數據庫,一般的緣由是企業已經具有了管理關係型數據的專長,可是它在性能上絕對不優於上述消息存儲實現。事實是,許多企業使用關係數據庫做爲存儲,是由於他們更願意充分利用這些數據庫資源。

複製代碼

<beans>
        <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">
                <persistenceAdapter>
                        <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
                </persistenceAdapter>
        </broker>
        <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
                <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
                <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
                <property name="username" value="activemq"/>
                <property name="password" value="activemq"/>
                <property name="maxActive" value="200"/>
                <property name="poolPreparedStatements" value="true"/>
        </bean>
</beans>

複製代碼

 4.內存存儲

內存消息存儲器將全部持久消息保存在內存中。在僅存儲有限數量 Message 的狀況下,內存消息存儲會頗有用,由於 Message 一般會被快速消耗。在 activema.xml 中將 broker 元素上的 persistent 屬性設置爲 false 便可。

複製代碼

<broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core">
        <transportConnectors>
                <transportConnector uri="tcp://localhost:61635"/>
        </transportConnectors>
</broker>

複製代碼

老王聽完後露出讚許的笑容:「連配置都能寫的這麼詳細,看來你確實是作了很多功課,給你點個贊。」老王終究不會吝嗇本身的讚美,他也明白這些讚美對成小胖意味着什麼。

成小胖獲得了老王的讚揚,內心也是吃了蜜通常。

沒等成小胖說話,老王拿起筆走到白板前,說:「下面就根據我在工做中的經歷,給你講講 ActiveMQ 的部署模式。

1.單例模式

這個就不囉嗦了,略過。

2.無共享主從模式

這是最簡單的 Provider 高可用性的方案,主從節點分別存儲 Message。從節點須要配置爲鏈接到主節點,而且須要特殊配置其狀態。

全部消息命令(消息,確認,訂閱,事務等)都從主節點複製到從節點,這種複製發生在主節點對其接收的任何命令生效以前。而且,當主節點收到持久消息,會等待從節點完成消息的處理(一般是持久化到存儲),而後再本身完成消息的處理(如持久化到存儲)後,再返回對 Producer 的回執。

從節點不啓動任何傳輸,也不能接受任何客戶端或網絡鏈接,除非主節點失效。當主節點失效後,從節點自動成爲主節點,而且開啓傳輸並接受鏈接。這是,使用 failover 傳輸的客戶端就會鏈接到該新主節點。

Broker 鏈接配置以下:

failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false

可是,這種部署模式有一些限制,

  • 主節點只會在從節點鏈接到主節點時複製其活動狀態,所以當從節點沒有鏈接上主節點以前,任何主節點處理的 Message 或者消息確認都會在主節點失效後丟失。不過你能夠經過在主節點設置 waitForSlave 來避免,這樣就強制主節點在沒有任何一個從節點鏈接上的狀況下接受鏈接。
  • 就是主節點只能有一個從節點,而且從節點不容許再有其餘從節點。
  • 把正在運行的單例配置成無共享主從,或者配置新的從節點時,你都要中止當前服務,修改配置後再重啓才能生效。

在能夠接受一些故障停機時間的狀況下,可使用該模式。

從節點配置:

<services>
        <masterConnector remoteURI="tcp://remotehost:62001" userName="Rob" password="Davies"/>
</services>

此外,能夠配置 shutdownOnMasterFailure 項,表示主節點失效後安全關閉,保證沒有消息丟失,容許管理員維護一個新的從節點。

3.共享存儲主從模式

容許多個代理共享存儲,但任意時刻只有一個是活動的。這種狀況下,當主節點失效時,無需人工干預來維護應用的完整性。另一個好處就是沒有從節點數的限制。

有兩種細分模式:

(1)基於數據庫

它會獲取一個表上的排它鎖,以確保沒有其餘 ActiveMQ 代理能夠同時訪問數據庫。其餘未得到鎖的代理則處於輪詢狀態,就會被當作是從節點,不會開啓傳輸也不會接受鏈接。


(2)基於文件系統

須要獲取分佈式共享文件鎖,linux 系統下推薦用 GFS2。

看到這些乾貨,成小胖欣喜若狂一邊聽一邊記,等老王講完後他還沒記完。而老王則趁機喝了杯鐵觀音潤潤嗓子。

在記錄完老王所講的部署模式後,成小胖也很差意思再讓老王繼續講下去了,畢竟他知道老王常年加班腰間盤突出,不能長時間站着。

「王哥您坐着休息下,我再給您講講我所理解的 ActiveMQ 的網絡鏈接,中不中?」

「中。沒事兒,我身體好着呢~」老王知道成小胖擔憂他的腰,但他仍是那個倔脾氣。成小胖也不敢多耽誤時間,立馬開講。

1.代理網絡

支持將 ActiveMQ 消息代理連接到不一樣拓撲,這就是被人們熟知的代理網絡。

ActiveMQ 網絡使用存儲和轉發的概念,其中消息老是存儲在本地代理中,而後經過網絡轉發到另外一個代理。


當鏈接創建後,遠程代理將把包含其全部持久和活動消費者目的地的信息傳遞給本地代理,本地代理根據信息決定遠程代理感興趣的 Message 並將它發送給遠程代理。

若是但願網絡是雙向的,您可使用網絡鏈接器將遠程代理配置爲指向本地代理,或將網絡鏈接器配置爲雙工,以便雙向發送消息。

複製代碼

<networkConnectors>
        <networkConnector uri="static://(tcp://backoffice:61617)"
                              name="bridge"
                              duplex="true"
                              conduitSubscriptions="true"
                              decreaseNetworkConsumerPriority="false">
        </networkConnector>
</networkConnectors>

複製代碼

注意,配置的順序很重要:

    1.網絡鏈接——須要在消息存儲前創建好鏈接,對應 networkConnectors 元素
    2.消息存儲——須要在傳輸前配置好,對應 persistenceAdapter 元素
    3.消息傳輸——最後配置,對應 transportConnectors 元素

2.網絡發現

(1)動態發現

使用多播來支持網絡動態發現。配置以下:

<networkConnectors>
    <networkConnector uri="multicast://default"/>
</networkConnectors>

其中,multicast:// 中的默認名稱表示該代理所屬的組。所以使用此方式時,強烈推薦你使用一個獨特的組名,避免你的代理鏈接到其餘不相關代理。

(2)靜態發現

靜態發現接受代理 URI 列表,並將嘗試按列表中肯定的順序鏈接到遠程代理。

<networkConnectors>
    <networkConnector uri="static:(tcp://remote-master:61617,tcp://remote-slave:61617)"/>
</networkConnectors>

相關配置以下:

  • initialReconnectDelay:默認值1000,表示嘗試鏈接前的時延。
  • maxReconnectDelay:默認值30000,表示鏈接失敗後到從新創建鏈接之間的時延,僅在 useExponentialBackOff 啓用時生效。
  • useExponentialBackOff:默認值 true,若是啓用,表示每次失敗後增長重建鏈接的時延。
  • backOffMultiplier:默認值2,表示啓用 useExponentialBackOff 後每次的時延增量須要注意的是,網絡鏈接將始終嘗試創建到遠程代理的鏈接。

須要注意的是,網絡鏈接將始終嘗試創建到遠程代理的鏈接。

(3)多鏈接場景


當網絡負載高時,使用多鏈接頗有意義。可是你須要確保不會重複傳遞消息,這能夠經過過濾器來實現。

複製代碼

<networkConnectors>
    <networkConnector uri="static://(tcp://remotehost:61617)"
                              name="queues_only"
                              duplex="true"
        <excludedDestinations>
            <topic physicalName=">"/>
        </excludedDestinations>
    </networkConnector>
    <networkConnector uri="static://(tcp://remotehost:61617)"
                              name="topics_only"
                              duplex="true"
        <excludedDestinations>
            <queue physicalName=">"/>
        </excludedDestinations>
    </networkConnector>
</networkConnectors>

複製代碼

講完後成小胖如釋重負,由於上面這些知識點雖然看起來不多,但他卻花了不少時間看了不少英文資料,同時反覆實踐才理解透的呢。

在成小胖講的這段時間,老王一直坐在轉椅上,這會兒他的腰也舒服了不少。老王站起來拍了拍成小胖的肩膀:「這個知識點雖然理解起來有點晦澀,可是你解釋得仍是挺不錯的。經過今天的交流,能夠看出你對 ActiveMQ 的基礎知識有了不錯的掌握,從此呢仍是要多加深刻實踐,這樣才能使用它提供高質量的服務。」

老王的手機忽然響了,是要去開會了:「今天就到這兒吧,我有個會議要參加。」

「好,謝謝王哥的耐心指導。但願你多注意身體。」

老王鬼魅的一笑:「嗯,放心吧,隔壁有對年輕人剛結婚,聽Ta們說最近想要個小baby,因此我確定會保養好本身的身體的。」

成小胖:「……」

「哈哈,開玩笑的!」

「……」

相關文章
相關標籤/搜索