Java語言快速實現簡單MQ消息隊列服務

使用 JAVA 語言本身動手來寫一個MQ (相似ActiveMQ,RabbitMQ)架構

主要角色

首先咱們必須須要搞明白 MQ (消息隊列) 中的三個基本角色socket

Producer
Broker
Consumer

總體架構以下所示分佈式

自定義協議

首先從上一篇中介紹了協議的相關信息,具體廠商的 MQ(消息隊列) 須要遵循某種協議或者自定義協議 , 消息的 生產者和消費者須要遵循其協議(約定)才能後成功地生產消息和生產消息 ,因此在這裏咱們自定義一個協議以下.ide

消息處理中心 : 若是接收到的信息包含"SEND"字符串,即視爲生產者發送的消息,消息處理中心須要將此信息存儲等待消費者消費學習

消息處理中心 : 若是接受到的信息爲CONSUME,既視爲消費者發送消費請求,須要將存儲的消息隊列頭部的信息轉發給消費者,而後將此消息從隊列中移除測試

消息處理中心 : 若是消息處理中心存儲的消息滿3條仍然沒有消費者進行消費,則再也不接受生產者的生產請求this

消息生產者:須要遵循協議將生產的消息頭部增長"SEND:" 表示生產消息spa

消息消費者:須要遵循協議向消息處理中心發送"CONSUME"字符串表示消費消息日誌

流程順序

項目構建流程

下面將整個MQ的構建流程過一遍code

  1. 新建一個 Broker 類,內部維護一個 ArrayBlockingQueue 隊列,提供生產消息和消費消息的方法, 僅僅具有存儲服務功能
  2. 新建一個 BrokerServer 類,將 Broker 發佈爲服務到本地9999端口,監聽本地9999端口的 Socket 連接,在接受的信息中進行咱們的協議校驗, 這裏 僅僅具有接受消息,校驗協議,轉發消息功能;
  3. 新建一個 MqClient 類,此類提供與本地端口9999的Socket連接 , 僅僅具有生產消息和消費消息的方法
  4. 測試:新建兩個 MyClient 類對象,分別執行其生產方法和消費方法

具體使用流程

  1. 生產消息:客戶端執行生產消息方法,傳入須要生產的信息,該信息須要遵循咱們自定義的協議,消息處理中心服務在接受到消息會根據自定義的協議校驗該消息是否合法,若是合法若是合法就會將該消息存儲到Broker內部維護的 ArrayBlockingQueue 隊列中.若是 ArrayBlockingQueue 隊列沒有達到咱們協議中的最大長度將將消息添加到隊列中,不然輸出生產消息失敗.
  2. 消息消息:客戶端執行消費消息方法, Broker服務 會校驗請求的信息的信息是否等於 CONSUME ,若是驗證成功則從Broker內部維護的 ArrayBlockingQueue 隊列的 Poll 出一個消息返回給客戶端

代碼演示

消息處理中心 Broker

/**
 * 消息處理中心
 */
public class Broker {
    // 隊列存儲消息的最大數量
    private final static int MAX_SIZE = 3;

    // 保存消息數據的容器
    private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(MAX_SIZE);

    // 生產消息
    public static void produce(String msg) {
        if (messageQueue.offer(msg)) {
            System.out.println("成功向消息處理中心投遞消息:" + msg + ",當前暫存的消息數量是:" + messageQueue.size());
        } else {
            System.out.println("消息處理中心內暫存的消息達到最大負荷,不能繼續放入消息!");
        }
        System.out.println("=======================");
    }

    // 消費消息
    public static String consume() {
        String msg = messageQueue.poll();
        if (msg != null) {
            // 消費條件知足狀況,從消息容器中取出一條消息
            System.out.println("已經消費消息:" + msg + ",當前暫存的消息數量是:" + messageQueue.size());
        } else {
            System.out.println("消息處理中心內沒有消息可供消費!");
        }
        System.out.println("=======================");

        return msg;
    }

}

消息處理中心服務 BrokerServer

/**
 * 用於啓動消息處理中心
 */
public class BrokerServer implements Runnable {

    public static int SERVICE_PORT = 9999;

    private final Socket socket;

    public BrokerServer(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try (
                BufferedReader in = new BufferedReader(new InputStreamReader(
                        socket.getInputStream()));
                PrintWriter out = new PrintWriter(socket.getOutputStream())
        )
        {
            while (true) {
                String str = in.readLine();
                if (str == null) {
                    continue;
                }
                System.out.println("接收到原始數據:" + str);

                if (str.equals("CONSUME")) { //CONSUME 表示要消費一條消息
                    //從消息隊列中消費一條消息
                    String message = Broker.consume();
                    out.println(message);
                    out.flush();
                } else if (str.contains("SEND:")){
                    //接受到的請求包含SEND:字符串 表示生產消息放到消息隊列中
                    Broker.produce(str);
                }else {
                    System.out.println("原始數據:"+str+"沒有遵循協議,不提供相關服務");
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        ServerSocket server = new ServerSocket(SERVICE_PORT);
        while (true) {
            BrokerServer brokerServer = new BrokerServer(server.accept());
            new Thread(brokerServer).start();
        }
    }
}

客戶端 MqClient

/**
 * 訪問消息隊列的客戶端
 */
public class MqClient {

    //生產消息
    public static void produce(String message) throws Exception {
        //本地的的BrokerServer.SERVICE_PORT 建立SOCKET
        Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);
        try (
                PrintWriter out = new PrintWriter(socket.getOutputStream())
        ) {
            out.println(message);
            out.flush();
        }
    }

    //消費消息
    public static String consume() throws Exception {
        Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);
        try (
                BufferedReader in = new BufferedReader(new InputStreamReader(
                        socket.getInputStream()));
                PrintWriter out = new PrintWriter(socket.getOutputStream())
        ) {
            //先向消息隊列發送命令
            out.println("CONSUME");
            out.flush();

            //再從消息隊列獲取一條消息
            String message = in.readLine();

            return message;
        }
    }

}

測試MQ

public class ProduceClient {

    public static void main(String[] args) throws Exception {
        MqClient client = new MqClient();

        client.produce("SEND:Hello World");
    }

}

public class ConsumeClient {

    public static void main(String[] args) throws Exception {
        MqClient client = new MqClient();
        String message = client.consume();

        System.out.println("獲取的消息爲:" + message);
    }
}

咱們多執行幾回客戶端的生產方法和消費方法就能夠看到一個完整的MQ的通信過程,下面是我執行了幾回的一些日誌

接收到原始數據:SEND:Hello World
成功向消息處理中心投遞消息:SEND:Hello World,當前暫存的消息數量是:1
=======================
接收到原始數據:SEND:Hello World
成功向消息處理中心投遞消息:SEND:Hello World,當前暫存的消息數量是:2
=======================
接收到原始數據:SEND:Hello World
成功向消息處理中心投遞消息:SEND:Hello World,當前暫存的消息數量是:3
=======================
接收到原始數據:SEND:Hello World
消息處理中心內暫存的消息達到最大負荷,不能繼續放入消息!
=======================
接收到原始數據:Hello World
原始數據:Hello World沒有遵循協議,不提供相關服務

接收到原始數據:CONSUME
已經消費消息:SEND:Hello World,當前暫存的消息數量是:2
=======================
接收到原始數據:CONSUME
已經消費消息:SEND:Hello World,當前暫存的消息數量是:1
=======================
接收到原始數據:CONSUME
已經消費消息:SEND:Hello World,當前暫存的消息數量是:0
=======================
接收到原始數據:CONSUME
消息處理中心內沒有消息可供消費!
=======================

小結

本章示例代碼主要源自分佈式消息中間件實踐一書 , 這裏咱們本身使用Java語言寫了一個MQ消息隊列 , 經過這個消息隊列咱們對MQ中的幾個角色 "生產者,消費者,消費處理中心,協議" 有了更深的理解 ; 那麼下一章節咱們就來一塊學習具體廠商的MQ RabbitMQ

相關文章
相關標籤/搜索