RabbitMQ入門案例

RabbitMQ入門案例

Rabbit 模式

https://www.rabbitmq.com/getstarted.htmlhtml

實現步驟

  • 構建一個 maven工程
  • 導入 rabbitmq的依賴
  • 啓動 rabbitmq-server服務
  • 定義生產者
  • 定義消費者
  • 觀察消息的在 rabbitmq-server服務中的進程

初步實現

前期準備

1.構建項目

1

2.導入依賴

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

簡單模型

4

在上圖的模型中,有如下概念:java

  1. 生產者,也就是要發送消息的程序
  2. 消費者:消息的接受者,會一直等待消息到來。
  3. 消息隊列:圖中紅色部分。相似一個郵箱,能夠緩存消息;生產者向其中投遞消息,消費者從其中取出消息。

全部的中間件技術都是基於tcp/ip協議基礎之上構建新型的協議規範,只不過rabbitmq遵循的是amqp緩存

實現步驟:安全

  1. 建立鏈接工程
  2. 建立鏈接 connection
  3. 經過鏈接獲取通道 Channel
  4. 經過通道建立交換機,聲明隊列,綁定關係,路由key,發送消息,和接收消息
  5. 準備消息內容
  6. 發送消息給隊列 queue
  7. 關閉鏈接
  8. 關閉通道

生產者

public class Producer {
    public static void main(String[] args) {
        //1. 建立鏈接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //這裏要使用本身的IP地址
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            //2. 建立鏈接 connection
            connection = connectionFactory.newConnection("生產者");
            //3. 經過鏈接獲取通道 Channel
            channel = connection.createChannel();
            //4. 經過通道建立交換機,聲明隊列,綁定關係,路由key,發送消息
            String quequeName = "queuel";
            /**
             * @params1 隊列的名稱
             * @params2 是否要持久化 durable-false
             * @params3 排他性,是不是獨佔獨立
             * @params4 是否自動刪除,隨着最後一個消費者消息完畢之後是否把隊列自動刪除
             * @params5 攜帶的附屬參數
             */
            channel.queueDeclare(quequeName,false,false,false,null);
            //5. 準備消息內容
            String message = "Hello,Consumer";
            //6. 發送消息給隊列 queue
            channel.basicPublish("",quequeName,null,message.getBytes());
            System.out.println("消息發送成功");
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //7. 關閉鏈接
            if (channel != null && channel.isOpen()){
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            //8. 關閉通道
            if (connection != null && connection.isOpen()){
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消費者

public class Consumer {

    public static void main(String[] args) {
        //1. 建立鏈接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //這裏要使用本身的IP地址
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //2. 建立鏈接 connection
            connection = connectionFactory.newConnection("消費者");
            //3. 經過鏈接獲取通道 Channel
            channel = connection.createChannel();
            //4. 經過通道建立交換機,聲明隊列,綁定關係,路由key,發送消息,和接收消息
            String quequeName = "queue1";
            channel.queueDeclare(quequeName,false,false,false,null);
            //5.監聽消息
            DefaultConsumer consumer = new DefaultConsumer(channel){
                /*
                consumerTag:消息者標籤,channel.basicConsume能夠指定
                envelope:消息包內容,可從中獲取消息id,消息routing key,交換機,消息和重裝標記(收到消息失敗後是否須要從新發送)
                properties:消息屬性
                body;消息
                */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //路由key
                    System.out.println("路由key爲:"+ envelope.getRoutingKey());
                    //交換機
                    System.out.println("交換機爲:"+ envelope.getExchange());
                    //消息id
                    System.out.println("消息id爲:"+ envelope.getDeliveryTag());
                    //收到的消息
                    System.out.println("接收到的消息:"+ new String(body,"UTF-8"));
                    System.out.println("");
                    System.out.println("======================================================");
                    System.out.println("");
                }
            };
            channel.basicConsume("queue1", true, consumer);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //6. 不關閉資源,一直監聽
        }
    }
}

5

5

AMQP

概念介紹

AMQP 一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。服務器

AMQP是一個二進制協議,擁有一些現代化特色:多信道協商式異步安全擴平臺中立高效網絡

RabbitMQ 是 AMQP協議 的 Erlang的實現。異步

概念 說明
鏈接 Connection 一個網絡鏈接,例如:TCP/IP套接字鏈接。
會話 Session 端點之間的命名對話。在一個會話上下文中,保證「剛好傳遞一次」。
信道 Channel 多路複用鏈接中的一條獨立的雙向數據流通道。爲會話提供物理傳輸介質。
客戶端 Client AMQP鏈接或者會話的發起者。AMQP是非對稱的,客戶端生產和消費消息,服務器存儲和路由這些消息。
服務節點Broker 消息中間件的服務節點。通常狀況下能夠將一個RabbitMQ Broker看做一臺RabbitMQ 服務器。
端點 AMQP對話的任意一方。一個AMQP鏈接包括兩個端點(一個是客戶端,一個是服務器)。
消費者 Consumer 一個從消息隊列裏請求消息的客戶端程序。
生產者 Producer 一個向交換機發布消息的客戶端應用程序。

RabbitMQ運轉流程

入門案例 爲例maven

生產者發送消息

  1. 生產者建立鏈接(Connection),開啓一個信道(Channel),鏈接到RabbitMQ Broker;
  2. 聲明隊列、設置屬性;如是否排它,是否持久化,是否自動刪除;
  3. 將路由鍵(空字符串)與隊列綁定起來;
  4. 發送消息至RabbitMQ Broker;
  5. 關閉信道;
  6. 關閉鏈接;

消費者接收消息

  1. 消費者建立鏈接(Connection),開啓一個信道(Channel),鏈接到RabbitMQ Broker
  2. 向Broker 請求消費相應隊列中的消息,設置相應的回調函數;
  3. 等待Broker迴應閉關投遞響應隊列中的消息,消費者接收消息;
  4. 確認(ack,自動確認)接收到的消息;
  5. RabbitMQ從隊列中刪除相應已經被確認的消息;
  6. 關閉信道;
  7. 關閉鏈接;

生產者流轉過程解析

  1. 客戶端與代理服務器Broker創建鏈接。調用 newConnection() 方法 , 會進一步封裝 Protocol Header 0-9-1 的報文頭髮送給 Broker ,以此通知Broker 本次交互採用的是 AMQP 0-9-1 協議,緊接着 Broker 返回 Connection.Start 來創建鏈接,在鏈接的過程當中涉及 Connection.Start/.Start-OKConnection.Tune/.Tune-OkConnection.Open/ .Open-Ok 這6 個命令的交互。
  2. 客戶端調用 connection.createChannel 方法。此方法開啓信道,其包裝的 channel.open 命令發送給 Broker , 等待 channel.basicPublish 方法,對應的AMQP命令爲 Basic.Publish , 這個命令包含了content Headercontent Body() 。content Header 包含了消息體的屬性,例如:投遞模式,優先級等,content Body 包含了消息體自己。
  3. 客戶端發送完消息須要關閉資源時,涉及到Channel.Close和Channl.Close-OkConnetion.Close和Connection.Close-Ok的命令交互。

6

消費者流轉過程解析

  1. 消費者客戶端與代理服務器Broker創建鏈接。會調用 newConnection() 方法,這個方法會進一步封裝 Protocol Header 0-9-1 的報文頭髮送給Broker ,以此通知Broker 本次交互採用的是 AMQP 0-9-1 協議,緊接着Broker 返回Connection.Start 來創建鏈接,在鏈接的過程當中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 這6 個命令的交互。
  2. 消費者客戶端調用connection.createChannel方法。和生產者客戶端同樣,協議涉及Channel . Open/Open-Ok命令。
  3. 在真正消費以前,消費者客戶端須要向Broker 發送Basic.Consume 命令(即調用channel.basicConsume 方法〉將Channel 置爲接收模式,以後Broker 回執 Basic . Consume - Ok 以告訴消費者客戶端準備好消費消息。
  4. Broker 向消費者客戶端推送(Push) 消息,即 Basic.Deliver 命令,這個命令和 Basic.Publish 命令同樣會攜帶 Content Header 和Content Body。
  5. 消費者接收到消息並正確消費以後,向Broker 發送確認,即 Basic.Ack 命令。
  6. 客戶端發送完消息須要關閉資源時,涉及到 Channel.Close和Channl.Close-Ok 與Connetion.Close和Connection.Close-Ok 的命令交互。

7

我的博客爲:
MoYu's HomePagetcp

相關文章
相關標籤/搜索