RabbitMQ簡介、安裝、基本特性API--Java測試

新的閱讀體驗地址:http://www.zhouhong.icu/post/141

本篇文章全部的代碼:https://github.com/Tom-shushu/Distributed-system-learning-notes/tree/master/rabbitmq-api-demo

1、初識RabbitMQ

是一個開源的消息代理和隊列服務器,用來經過普通協議在徹底不一樣的應用之間共享數據,RabbitMQ是使用Erlang語言來編寫的,而且RabbitMQ是基於AMQP協議的。
AMQP協議Advanced Message Queuing Protocol(高級消息隊列協議)
 定義:具備現代特徵的二進制協議,是一個提供統一消息服務的應用層標準高級消息隊列協議,
是應用層協議的一個開放標準,爲面向消息中間件設計。
AMQP專業術語:
  • Server:又稱broker,接受客戶端的連接,實現AMQP實體服務
  • Connection:鏈接,應用程序與broker的網絡鏈接
  • Channel:網絡信道,幾乎全部的操做都在channel中進行,Channel是進行消息讀寫的通道。客戶端能夠創建多個channel,每一個channel表明一個會話任務。
  • Message:消息,服務器與應用程序之間傳送的數據,由Properties和Body組成.Properties能夠對消息進行修飾,必須消息的優先級、延遲等高級特性;Body則是消息體內容。
  • virtualhost: 虛擬地址,用於進行邏輯隔離,最上層的消息路由。一個virtual host裏面能夠有若干個Exchange和Queue,同一個Virtual Host 裏面不能有相同名稱的Exchange 或 Queue。
  • Exchange:交換機,接收消息,根據路由鍵轉單消息到綁定隊列
  • Binding:  Exchange和Queue之間的虛擬連接,binding中能夠包換routing key
  • Routing key: 一個路由規則,虛擬機可用它來肯定如何路由一個特定消息。(如負載均衡)
RabbitMQ總體架構

Exchange和隊列是多對多關係,實際操做通常爲1個exchange對多個隊列,爲避免設計過於複雜.

2、單機版快速安裝

  • 一、首先在Linux上進行一些軟件的準備工做

yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.0.4/erlang-23.0.4-1.el7.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm
  • 三、安裝服務命令
rpm -ivh erlang-23.0.4-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm​
  • 四、啓動

啓動服務
systemctl start rabbitmq-server
查看是否啓動
lsof -i:5672
  • 五、啓動、安裝web管理插件(管控臺)

rabbitmq-plugins enable rabbitmq_management
  • 六、查看管理端口有沒有啓動

lsof -i:15672
或者:
netstat -tnlp | grep 15672
  • 七、添加用戶

#添加用戶 用戶名 admin 密碼 admin web管理工具可用此用戶登陸
sudo rabbitmqctl add_user admin admin
#設置用戶角色 管理員
sudo rabbitmqctl set_user_tags admin administrator
#設置用戶權限(接受來自全部Host的全部操做)
sudo rabbitmqctl set_permissions -p / admin "." "." ".*"  
#查看用戶權限
sudo rabbitmqctl list_user_permissions admin
  • 從新啓動

systemctl start rabbitmq-server
rabbitmq-plugins enable rabbitmq_management

  • 代碼測試
  1. 引入依賴
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>

        2.發送端:html

package com.zhouhong.rabbitmq.api.helloworld;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender {
    public static void main(String[] args) throws Exception {
        //    1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        //    2 建立Connection
        Connection connection = connectionFactory.newConnection();
        //    3 建立Channel
        Channel channel = connection.createChannel();  
        //    4 聲明
        String queueName = "test001";  
        //    參數: queue名字,是否持久化,獨佔的queue(僅供此鏈接),不使用時是否自動刪除, 其餘參數
        channel.queueDeclare(queueName, false, false, false, null);
        Map<String, Object> headers = new HashMap<String, Object>();
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
        .deliveryMode(2)
        .contentEncoding("UTF-8")
        .headers(headers).build();
        for(int i = 0; i < 5;i++) {
            String msg = "Hello World RabbitMQ " + i;
            channel.basicPublish("", queueName , props , msg.getBytes());             
        }
    }
}

    3.接收端java

package com.zhouhong.rabbitmq.api.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver {
    public static void main(String[] args) throws Exception {        
        ConnectionFactory connectionFactory = new ConnectionFactory() ;          
        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("admin");
        connectionFactory.setUsername("admin");
        connectionFactory.setVirtualHost("/");        
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();        
        Channel channel = connection.createChannel();          
        String queueName = "test001";  
        //    durable 是否持久化消息
        channel.queueDeclare(queueName, false, false, false, null);  
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //    參數:隊列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //    循環獲取消息  
        while(true){  
            //    獲取消息,若是沒有消息,這一步將會一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
    }
}

    4.結果(先啓動接收端進行監控,再啓動發送端)c++

收到消息:Hello World RabbitMQ 0
收到消息:Hello World RabbitMQ 1
收到消息:Hello World RabbitMQ 2
收到消息:Hello World RabbitMQ 3
收到消息:Hello World RabbitMQ 4

3、RabbitMQ----交換機

  1. Name:交換機名稱。
  2. Type:交換機類型 direct、topic、fanout、headers。
  3. Durability:是否持久化,ture爲持久化。
  4. Auto Delete :當最後一個綁定道Exchange上的隊列刪除後,自動刪除該Exchange。
  5. Internal:當前Exchange是否用於RabbitMQ內部使用,默認爲False。
  6. Arguments:擴展參數,用於擴展AMQP協議自制定化使用。
  7. DirectExchange的消息被轉發道RouteKey中指定的Queue。
交換機-----Direct exchange
Direct模式可使用RabbitMQ自帶的Exchange:default Exchange,因此不須要將Exchange進行任何綁定操做,消息傳遞時,RouteKey必須徹底匹配纔會被隊列接收,不然該消息會被拋棄。

代碼:
  • 發送端
package com.zhouhong.rabbitmq.api.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4DirectExchange {
    public static void main(String[] args) throws Exception {
        //1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("admin");
        connectionFactory.setUsername("admin");
        connectionFactory.setVirtualHost("/");
        //2 建立Connection
        Connection connection = connectionFactory.newConnection();
        //3 建立Channel
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_direct_exchange";
        //必需要和接收端 routingKey 一一對應
        String routingKey = "test_direct_routingKey";
        //5 發送
        String msg = "Hello World RabbitMQ 4  Direct Exchange Message ... ";
        channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());                 
    }
}
  •  接收端
package com.zhouhong.rabbitmq.api.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4DirectExchange {    
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;          
        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("admin");
        connectionFactory.setUsername("admin");
        connectionFactory.setVirtualHost("/");    
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test_direct_routingKey";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數:隊列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循環獲取消息  
        while(true){  
            //獲取消息,若是沒有消息,這一步將會一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
    }
}
交換機-----topic exchange
exchange 將Routekey和某個topic進行一個模糊匹配,發送給對應隊列、能夠用通配符進行匹配

好比下面例子
代碼:
  • 接收端
package com.zhouhong.rabbitmq.api.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4TopicExchange1 {
    public static void main(String[] args) throws Exception {        
        ConnectionFactory connectionFactory = new ConnectionFactory() ;         
        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("admin");
        connectionFactory.setUsername("admin");
        connectionFactory.setVirtualHost("/");        
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();       
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        // 只能匹配一個 例如:user.txt、user.py均可以,可是user.txt.py 不行
        //String routingKey = "user.*";
        // user.txt、user.py 、user.txt.py 均可以匹配到
        String routingKey = "user.#";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);        
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //    參數:隊列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        System.err.println("consumer1 start.. ");
        //    循環獲取消息  
        while(true){  
            //    獲取消息,若是沒有消息,這一步將會一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());  
        } 
    }
}
  • 發送端
package com.zhouhong.rabbitmq.api.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4TopicExchange {    
    public static void main(String[] args) throws Exception {        
        //1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("admin");
        connectionFactory.setUsername("admin");
        connectionFactory.setVirtualHost("/");        
        //2 建立Connection
        Connection connection = connectionFactory.newConnection();
        //3 建立Channel
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        //5 發送        
        String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
        channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());     
        channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
        channel.close();  
        connection.close();  
    }    
}
交換機-----Fanout exchange 廣播模式
1.不處理路由鍵,只須要簡單的將隊列綁定到交換機上。
2.發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上。
3.Fanout交換機轉發消息是最快的。
代碼:見示例文章開始GitHub地址

4、RabbitMQ高級特性

一、消息如何保障 100% 的投遞成功
生產端的可靠性投遞的標誌:
一、消息成功發出
二、mq節點成功接收
三、發送端MQ節點確認應答
四、完善的消息補償機制
解決:消息信息落庫,對消息狀態進行打標
冪等性
    一、 select count(1) from t_order where id = 惟一id(或)指紋碼
    二、惟一id或指紋碼機制,利用數據庫主鍵去重
二、Confirm
第一步:再channel上開啓確認模式:channel.confirmSelect();
第二步:再channel上添加監聽:addConfirmListener,監聽成功和失敗的返回結果,根據具體的結果對消息進行從新發送、或記錄日期等後續處理!
三、return消息機制
ReturnListener用於處理不可路由的消息
咱們的消息生產者,經過指定一個Exchage和Routingkey,把消息送達某一個隊列中去,而後咱們的消費者監聽隊列,進行消費處理操做,若是沒有合適的隊列,則會由returnListener進行接受。
Mandatory:若是爲true,則監聽器會接收到路由不可達的消息,而後進行後續處理,若是爲false,那麼broker端自動刪除該消息。
四、消費端ACK與重回隊列
消費端ACK:
  • 在工做的時候通常不會選擇自動ack
  • 消費端的手工ack分爲兩種ACK和NACK
  • 消費端進行消費的時候,若是因爲業務異常咱們能夠進行日誌的記錄,而後進行補償。這種建議回覆NACK,不要重回隊列
  • 若是因爲服務器宕機等嚴重問題,那咱們就須要手工進行ACK保障消費端消費成功
消費端的重回隊列
  • 是爲了對沒有處理成功的消息,把消息從新會投遞給broker。
  • 重回隊列,會回到隊列的尾部
  • 也會形成一條消息一直重複投遞,死循環了
  • 在實際應用中,都會關閉重回隊列,也就是設置爲false
五、TTL隊列和消息
TTL: time to live的縮寫,也就是生存時間。
  • RabbitMQ 支持消息過時時間,在消息發送時能夠進行指定
  • RabbitMQ支持隊列過時時間,從消息入隊列開始計算,只要超過了隊列的超時間時間配置,那麼消息會自動的清除
死隊列: DLX,Dead-Letter-Exchange
  • 利用DLX,當消息在一個隊列中變成死信(dead message)以後,它能被從新publish到另外一個Exchange,這個Exchange就是DLX.
消息變成死信的幾種狀況
  • 消息被拒絕 而且requeue = false
  • 消息TTL過時
  • 隊列達到最大長度
DLX也是一個正常的Exchange,其實是一個屬性控制
  • 當隊列中有死信時,RabbitMQ就會自動的將這個消息從新發布到設置的Exchange上,進而被路由到另外一個隊列.
  • 能夠監聽這個隊列中消息作相應的處理,這個特性能夠彌補rabbitMQ3.0之前的immediate參數功能。
  • 在正常隊列上添加參數:arguments.put("x-dead-letter-exchange","dlx.exchange");這樣消息過時、requeue、隊列達到最大長度時,就能夠直接路由到死信隊列。
相關文章
相關標籤/搜索