Netty構建分佈式消息隊列(AvatarMQ)設計指南之架構篇

  目前業界流行的分佈式消息隊列系統(或者能夠叫作消息中間件)種類繁多,好比,基於Erlang的RabbitMQ、基於Java的ActiveMQ/Apache Kafka、基於C/C++的ZeroMQ等等,都能進行大批量的消息路由轉發。它們的共同特色是,都有一個消息中轉路由節點,按照消息隊列裏面的專業術語,這個角色應該是broker。整個消息系統經過這個broker節點,進行從消息生產者Producer到消費者Consumer的消息路由。固然了,生產者和消費者能夠是多對多的關係。消息路由的時候,能夠根據關鍵字(專業的術語叫topic),進行關鍵字精確匹配、模糊匹配、廣播方式的消息路由。java

  簡單來講,一個極簡的分佈式消息隊列系統主要的構成模塊有:git

  Broker:簡單來講就是消息隊列服務器實體。github

  Producer:消息的生產者,主要用來發送消息給消費者。spring

  Consumer:消息的消費者,主要用來接收生產者的消息。apache

  Routing Key:路由關鍵字(Topic),主要用來控制生產者和消費者之間的發送與接收消息的對應關係。緩存

  Channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。安全

  到此爲止,咱們明白了一個分佈式消息隊列系統的主要構成模塊,如今本人就經過Netty,這個優秀的Java NIO網絡通信框架,構建一個支持上述應用場景的分佈式消息隊列系統,本人把其命名爲AvatarMQ。後續我會基於這個開源項目,連載出基於Netty構建分佈式消息隊列系統系列相關的文章,闡明主要的設計思路、組織結構、模塊劃分依據、類圖結構等等。爲了說明方便,後續本文中,若是沒有特殊說明,有涉及基於Netty構建的分佈式消息隊列系統,就是指代AvatarMQ。因爲整個開源項目涉及的代碼量比較多,因此但願你們在本人編寫系列博客文章的基礎上,耐心地理解、分析其中的代碼模塊,相信必定不會讓您失望!服務器

  AvatarMQ基於Netty,因此首先,你要能清楚的理解Netty是什麼?它能作什麼?有興趣的朋友能夠關注一下Netty項目的官網(http://netty.io/),上面有很詳細的入門文章介紹。雖然都是英文的,可是這些一手的資料更具權威性,值得花時間深刻研究探索,畢竟如今流行的雲計算、大數據領域成功的開源項目好比Hadoop、Storm等等,網絡通訊層這塊所有依賴Netty,可見Netty的功能強大。網絡

  基於Netty能夠開發定製高性能、高可靠性的Java企業級服務端應用,而本文是我,在繼利用Netty構建高性能RPC服務器系列文章以後,又一個基於Netty開發的分佈式消息隊列系統(AvatarMQ)。此外AvatarMQ還大量使用了Java多線程的相關類庫。因此但願在此以前,你們能回憶複習一下,這樣理解起來會更加駕輕就熟、事半功倍。多線程

  AvatarMQ是基於Netty構建的分佈式消息隊列系統,支持多個生產者和多個消費者之間的消息路由、傳遞。主要特性以下:

  • AvatarMQ基於Java語言進行編寫,網絡通信依賴Netty。
  • 生產者和消費者的關係能夠是一對多、多對1、多對多的關係。
  • 若干個消費者能夠組成消費者集羣,生產者能夠向這個消費者集羣投遞消息。
  • 消費者集羣對於有共同關注點的消費者支持消息的負載均衡策略。
  • 支持動態新增、刪除生產者、消費者。
  • 目前僅僅支持關鍵字的精確匹配路由,後續會逐漸完善。
  • 消息隊列服務器Broker基於Netty的主從事件線程池模型開發設計。
  • 網絡消息序列化採用Kryo進行消息的網絡序列化傳輸。
  • Broker的消息派發、負載均衡、應答處理(ACK)基於異步多線程模型進行開發設計。
  • Broker消息的投遞,目前支持嚴格的消息順序。其中Broker還支持消息的緩衝派發,即Broker會緩存必定數量的消息以後,再批量分配給對此消息感興趣的消費者。

  AvatarMQ項目開源網址:https://github.com/tang-jie/AvatarMQ

  整個開源項目依賴的jar包請參考:https://github.com/tang-jie/AvatarMQ/blob/master/nbproject/project.properties

  另外,值得注意的是:

  AvatarMQ使用的Netty是基於4.0版本(下載地址:http://dl.bintray.com/netty/downloads/netty-4.0.37.Final.tar.bz2)。

  消息序列化使用的Kryo是基於kryo-3.0.3版本(下載地址:https://github.com/EsotericSoftware/kryo/releases/tag/kryo-parent-3.0.3)。

  請你們自行去官網下載使用。

  如今,如今言歸正傳,咱們先來看下整合AvatarMQ項目的軟件架構圖:

  

  從上述圖例中,咱們能夠很清楚的看到:生產者和消費者之間是經過Broker進行消息的路由和轉發,同時Broker還負責應答生產者和接收消費者的處理應答。

  在瞭解了,整個AvatarMQ的組織架構以後,咱們再來實際運行一下AvatarMQ!

  首先,先啓動一下Broker服務器(對應代碼:https://github.com/tang-jie/AvatarMQ/blob/master/src/com/newlandframework/avatarmq/spring/AvatarMQServerStartup.java

  若是一切正常,終端控制檯會打印以下輸出:

  

  接着,咱們就來實際驗證一下AvatarMQ的消息推送功能。

  一、生產者發送1條消息給關注這條消息的消費者。咱們先啓動消費者,再啓動生產者。

  其中消費者1的測試代碼(AvatarMQConsumer1.java)以下所示:

package com.newlandframework.avatarmq.test;

import com.newlandframework.avatarmq.consumer.AvatarMQConsumer;
import com.newlandframework.avatarmq.consumer.ProducerMessageHook;
import com.newlandframework.avatarmq.msg.ConsumerAckMessage;
import com.newlandframework.avatarmq.msg.Message;

/**
 * @filename:AvatarMQConsumer1.java
 * @description:AvatarMQConsumer1功能模塊
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class AvatarMQConsumer1 {

    private static ProducerMessageHook hook = new ProducerMessageHook() {
        public ConsumerAckMessage hookMessage(Message message) {
            System.out.printf("AvatarMQConsumer1 收到消息編號:%s,消息內容:%s\n", message.getMsgId(), new String(message.getBody()));
            ConsumerAckMessage result = new ConsumerAckMessage();
            result.setStatus(ConsumerAckMessage.SUCCESS);
            return result;
        }
    };

    public static void main(String[] args) {
        AvatarMQConsumer consumer = new AvatarMQConsumer("127.0.0.1:18888", "AvatarMQ-Topic-1", hook);
        consumer.init();
        consumer.setClusterId("AvatarMQCluster");
        consumer.receiveMode();
        consumer.start();
    }
}

  生產者1的測試代碼(AvatarMQProducer1.java)以下所示,其含義是發送1條消息,給關注「AvatarMQ-Topic-1」主題的消費者:

package com.newlandframework.avatarmq.test;

import com.newlandframework.avatarmq.msg.Message;
import com.newlandframework.avatarmq.msg.ProducerAckMessage;
import com.newlandframework.avatarmq.producer.AvatarMQProducer;
import org.apache.commons.lang3.StringUtils;

/**
 * @filename:AvatarMQProducer1.java
 * @description:AvatarMQProducer1功能模塊
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class AvatarMQProducer1 {

    public static void main(String[] args) throws InterruptedException {
        AvatarMQProducer producer = new AvatarMQProducer("127.0.0.1:18888", "AvatarMQ-Topic-1");
        producer.setClusterId("AvatarMQCluster");
        producer.init();
        producer.start();

        System.out.println(StringUtils.center("AvatarMQProducer1 消息發送開始", 50, "*"));

        for (int i = 0; i < 1; i++) {
            Message message = new Message();
            String str = "Hello AvatarMQ From Producer1[" + i + "]";
            message.setBody(str.getBytes());
            ProducerAckMessage result = producer.delivery(message);
            if (result.getStatus() == (ProducerAckMessage.SUCCESS)) {
                System.out.printf("AvatarMQProducer1 發送消息編號:%s\n", result.getMsgId());
            }

            Thread.sleep(100);
        }

        producer.shutdown();
        System.out.println(StringUtils.center("AvatarMQProducer1 消息發送完畢", 50, "*"));
    }
}

  首先咱們先來啓動消費者,若是一切正常,控制檯輸出結果爲:

  這個時候咱們再運行生產者,發送一條消息給消費者。啓動生產者以後,控制檯輸出結果以下:

  那如今,咱們切回去看下消費者是否收到生產者的消息了呢?

  很是正確,咱們的消費者果真收到了生產者發送過來的消息。

 

  二、生產者發送1條消息給不關注這條消息的消費者。

  首先說明的是,代碼樣例仍是基於上述的AvatarMQConsumer1.java、AvatarMQProducer1.java。只不過此次是生產者發送的主題改爲:「AvatarMQ-Topic-Test」,消費者關注的主題改爲「AvatarMQ-Topic-1」。而後依次啓動消費者、生產者。下面是實際的運行狀況:

  生產者成功發送消息:

  那按照要求,消費者應該沒法收到生產者的這條消息,實際狀況是否是這樣呢?事實勝於雄辯,看以下截圖所示:

  消費者依然處理啓動監聽狀態,說明徹底符合咱們的預期。

 

  三、生產者發送N條消息(這裏是發送100條消息)給一個消費者集羣(有2個消費者組成,而且這2個消費者關注的消息主題topic是相同的)。

  咱們先啓動2個消費者,再啓動生產者。消費者代碼參考:

package com.newlandframework.avatarmq.test;

import com.newlandframework.avatarmq.consumer.AvatarMQConsumer;
import com.newlandframework.avatarmq.consumer.ProducerMessageHook;
import com.newlandframework.avatarmq.msg.ConsumerAckMessage;
import com.newlandframework.avatarmq.msg.Message;

/**
 * @filename:AvatarMQConsumer2.java
 * @description:AvatarMQConsumer2功能模塊
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class AvatarMQConsumer2 {

    private static ProducerMessageHook hook = new ProducerMessageHook() {
        public ConsumerAckMessage hookMessage(Message message) {
            System.out.printf("AvatarMQConsumer2 收到消息編號:%s,消息內容:%s\n", message.getMsgId(), new String(message.getBody()));
            ConsumerAckMessage result = new ConsumerAckMessage();
            result.setStatus(ConsumerAckMessage.SUCCESS);
            return result;
        }
    };

    public static void main(String[] args) {
        AvatarMQConsumer consumer = new AvatarMQConsumer("127.0.0.1:18888", "AvatarMQ-Topic-2", hook);
        consumer.init();
        consumer.setClusterId("AvatarMQCluster2");
        consumer.receiveMode();
        consumer.start();
    }
}

  生產者代碼參考(目的是發送100條消息)給消費者集羣。

package com.newlandframework.avatarmq.test;

import com.newlandframework.avatarmq.msg.Message;
import com.newlandframework.avatarmq.msg.ProducerAckMessage;
import com.newlandframework.avatarmq.producer.AvatarMQProducer;
import org.apache.commons.lang3.StringUtils;

/**
 * @filename:AvatarMQProducer2.java
 * @description:AvatarMQProducer2功能模塊
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class AvatarMQProducer2 {

    public static void main(String[] args) throws InterruptedException {
        AvatarMQProducer producer = new AvatarMQProducer("127.0.0.1:18888", "AvatarMQ-Topic-2");
        producer.setClusterId("AvatarMQCluster2");
        producer.init();
        producer.start();

        System.out.println(StringUtils.center("AvatarMQProducer2 消息發送開始", 50, "*"));

        for (int i = 0; i < 100; i++) {
            Message message = new Message();
            String str = "Hello AvatarMQ From Producer2[" + i + "]";
            message.setBody(str.getBytes());
            ProducerAckMessage result = producer.delivery(message);
            if (result.getStatus() == (ProducerAckMessage.SUCCESS)) {
                System.out.printf("AvatarMQProducer2 發送消息編號:%s\n", result.getMsgId());
            }

            Thread.sleep(100);
        }

        producer.shutdown();
        System.out.println(StringUtils.center("AvatarMQProducer2 消息發送完畢", 50, "*"));
    }
}

  咱們依次啓動消費者AvatarMQConsumer2兩次,這個時候終端控制檯依次輸出:

  這個時候咱們再啓動生產者,運行截圖以下:

  說明生產者發送了100條消息出去,看下咱們消費者1接收的狀況:

  繼續看下咱們的消費者2,消息接收的狀況,截圖以下:

  最終統計一下,消費者1,接收的消息編號都是奇數,一共50個。消費者2,接收到的消息編號都是偶數,一共50個。兩個消費者接收的消息總數加起來,恰好等於生產者發送的消息總數100個,徹底符合咱們的預期!另外消費者一、消費者2都收到了來自生產者的消息,說明Broker進行了消息的路由傳遞。

  四、多個生產者和多個消費者的消息傳遞,以及動態新增、刪除生產者、消費者。

  這個就交給你們自行測試了,因爲篇幅有限,在此本人就不一一闡述。

 

  到目前爲止,相信你們對於AvatarMQ所具有的基本功能,有了一個大體的印象。固然,AvatarMQ還有一些美中不足,好比:

  • 不支持消息的刷盤存儲,可能因爲系統Crash,形成消息的丟失。後續須要接入一個存儲系統(基於Java NIO),保證消息的持久序列化。
  • AvatarMQ的生產者、消費者模塊,要進一步支持,斷網重連Broker的功能,確保在Broker重啓的狀況下,把在途的消息繼續發送、接收完畢。
  • Broker單點的問題,根據高可用性集羣HA(High Available)的標準,Broker也要有主節點和從節點機制。在主節點宕機的狀況,從節點要能灰度過渡,不至於Broker主節點宕機,整個AvatarMQ消息系統陷入癱瘓狀態。
  • 消息應答失敗,還未支持重試功能。
  • 固然還有一些未知的bug,有待發現和修復。
  • AvatarMQ的處理性能,未經歷過生產系統實際檢驗,暫時沒法保證其安全和可靠性。

  因爲代碼編寫、測試等等工做,都是本人利用工做之餘的時間完成,時間點上比較倉促。加上本人的技術水平有限,不免有說的不對及寫得很差的地方,或者其中應該有更好的解決方案。歡迎廣大同行、愛好者在線下進行學習交流,有什麼寶貴的建議和觀點,懇請批評指正,不吝賜教。雖然AvatarMQ和業界主流、久經考驗的消息隊列系統,在處理性能、可靠性上,確定還有不小的差距。可是能夠基於此,加深對分佈式消息隊列的理解,作到知其然知其因此然,何樂而不爲?

  最後,本人後續會逐漸推出「基於Netty構建的分佈式消息隊列系統(AvatarMQ)」,架構設計、原理分析的詳解連載文章,敬請期待!

  PS:目前AvatarMQ已經開源,整個項目託管到github,對應的網址爲:https://github.com/tang-jie/AvatarMQ,歡迎有興趣的同行朋友、愛好者關注支持。若是以爲還不錯,能夠點擊Star收藏、關注。固然,你還能夠點擊推薦本文,也算是對我辛苦付出的一點支持和回報,謝謝你們!

相關文章
相關標籤/搜索