rabbitMq

rabbitmq的基礎知識

參考資料: 很豐富的面試題 html

rabbitmq.png

基本概念

broker: 實體服務器 VirtualHost:縮小版的RabbitMq服務器,擁有本身獨立的交換器、消息隊列和相關的對象 Exchange:接受生產者的消息,並將這些消息路由到具體的Queue中 Binding:Exchange和Queue之間的關聯 Queue:用來保存消息直到發送給消費者,它是消息的容器。 Channel:多路複用鏈接中的獨立的雙向數據流通道,由於創建和銷燬TCP的Connection開銷node

rabbitmq對消息的保存方式

  1. disk 後綴.rdp a. 在發送時指定須要持久化或者服務器內存緊張時會將部分中的內存消息保存到磁盤中 b. 單個文件增長到16M後會生成新的文件 c. 文件中的消息被標記刪除的比例達到閾值時會觸發文件的合併,提升磁盤的利用率
  2. RAM,內存保存,效率高

Exchange 類型

訂閱模式(Fanout Exchange):

會將消息放到全部綁定到該exchange的隊列上 面試

rabbitMq_fanOut.png

public class ConnectionUtils {

    private static Connection connection;
    private static String lock = "aaa";

  /**
     * 獲取rabbitmq鏈接
     * @return
     */
    public static Connection getConnection() {
        if (null != connection) {
            return connection;
        }
        synchronized (lock) {
            if (null != connection) {
                return connection;
            }

            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("host");
            connectionFactory.setUsername("userName");
            connectionFactory.setPassword("password");
            connectionFactory.setVirtualHost("/vhost");
            try {
                connection = connectionFactory.newConnection();
            } catch (IOException e) {
                throw new RuntimeException("IoException", e);
            } catch (TimeoutException e) {
                throw new RuntimeException("timeOutException", e);
            }
            return connection;
        }
    }

}
複製代碼
public class FanOutProducer {

    private static final String EXCHAGE_NAME = "test_exchange_li_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        //創建鏈接
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        //聲明一個exchange,fanout類型,不持久化
        channel.exchangeDeclare(EXCHAGE_NAME, "fanout", false);

        //發送消息
        StringBuilder stringBuilder = new StringBuilder("message");
        for (int i = 0; i < 10; i++) {
            channel.basicPublish(EXCHAGE_NAME, "", null, stringBuilder.append(i).toString().getBytes("utf-8"));
        }

        //關閉鏈接
        channel.close();
        connection.close();
    }
}
複製代碼
public class FanOutConsumer01 {

    private static final String EXCHAGE_NAME = "test_exchange_li_fanout";
    private static final String QUEUE_NAME = "test_queue_Name_li_fanout";

    public static void main(String[] args) throws IOException {
        //創建連接
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        //聲明exchange和queue--exchange要和生產者一致
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare(EXCHAGE_NAME, "fanout");

        //將queue綁定到exchange上
        channel.queueBind(QUEUE_NAME, EXCHAGE_NAME, "");

        //定義消費者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                throws IOException {
                String message = new String(body, "utf-8");
                System.out.println(message);
            }
        };

        //開始消費
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
複製代碼
public class FanOutConsumer02 {

    private static final String EXCHAGE_NAME = "test_exchange_li_fanout";
    private static final String QUEUE_NAME = "test_queue_Name_li_fanout02";
....
}
複製代碼
兩個消費者都能打印以下
message0
message01
message012
message0123
message01234
message012345
message0123456
message01234567
message012345678
message0123456789
複製代碼

Direct Exchange

只會把消息routingkey一致的queue中 安全

rabbitMq_binding.png

rabbitMq_Direct.png

public class DirectProducer {

    private static final String EXCHAGE_NAME = "test_exchange_li_direct";
    private static final String ROUTING_KEY = "direct_routing_key";

    public static void main(String[] args) throws IOException, TimeoutException {
        //創建鏈接
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        //聲明一個exchange,direct類型,不持久化
        channel.exchangeDeclare(EXCHAGE_NAME, "direct", false);

        //發送消息
        StringBuilder stringBuilder = new StringBuilder("message");
        for (int i = 0; i < 10; i++) {
            channel.basicPublish(EXCHAGE_NAME, ROUTING_KEY, null, stringBuilder.append(i).toString().getBytes("utf-8"));
        }

        //關閉鏈接
        channel.close();
        connection.close();
    }
}
複製代碼
public class DirectConsumer01 {

    private static final String EXCHAGE_NAME = "test_exchange_li_direct";
    private static final String QUEUE_NAME = "test_queue_Name_li_direct";
    private static final String ROUTING_KEY = "direct_routing_key";

    public static void main(String[] args) throws IOException {
        //創建連接
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        //聲明exchange和queue--exchange要和生產者一致
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare(EXCHAGE_NAME, "direct");

        //將queue綁定到exchange上
        channel.queueBind(QUEUE_NAME, EXCHAGE_NAME, ROUTING_KEY);

        //定義消費者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                throws IOException {
                String message = new String(body, "utf-8");
                System.out.println(message);
            }
        };

        //開始消費
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
複製代碼
public class DirectConsumer02 {

    private static final String EXCHAGE_NAME = "test_exchange_li_direct";
    private static final String QUEUE_NAME = "test_queue_Name_li_direct01";
    private static final String ROUTING_KEY = "direct_routing_key02";
....
}
複製代碼
運行完成,只有DirectConsumer01能收到消息
message0
message01
message012
message0123
message01234
message012345
message0123456
message01234567
message012345678
message0123456789
複製代碼

Topic Exchange

對key進行模式匹配後進行投遞,符號」#」匹配一個或多個詞,符號」」匹配正好一個詞。例如」abc.#」匹配」abc.def.ghi」,」abc.」只匹配」abc.def」bash

rabbitMq_Topic.png

public class TopicProducer {

    private static final String EXCHAGE_NAME = "test_exchange_li_topic";

    private static final String ROUTING_KEY = "test.routingkey.01";

    public static void main(String[] args) throws IOException, TimeoutException {
        //創建鏈接
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        //聲明一個exchange,topic類型,不持久化
        channel.exchangeDeclare(EXCHAGE_NAME, "topic", false);

        //發送消息
        StringBuilder stringBuilder = new StringBuilder("message");
        for (int i = 0; i < 10; i++) {
            channel.basicPublish(EXCHAGE_NAME,ROUTING_KEY, null, stringBuilder.append(i).toString().getBytes("utf-8"));
        }
        //關閉鏈接
        channel.close();
        connection.close();
    }
}
複製代碼
public class TopicConsumer01 {

    private static final String EXCHAGE_NAME = "test_exchange_li_topic";
    private static final String QUEUE_NAME = "test_queue_Name_li_topic_01";
    private static final String ROUTING_KEY = "test.routingkey.#";

    public static void main(String[] args) throws IOException {
        //創建連接
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        //聲明exchange和queue--exchange要和生產者一致
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare(EXCHAGE_NAME, "topic");

        //將queue綁定到exchange上
        channel.queueBind(QUEUE_NAME, EXCHAGE_NAME, ROUTING_KEY);

        //定義消費者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                throws IOException {
                String message = new String(body, "utf-8");
                System.out.println(message);
            }
        };

        //開始消費
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
複製代碼
爲了節省篇幅,
public class TopicConsumer02 {

    private static final String EXCHAGE_NAME = "test_exchange_li_topic";
    private static final String QUEUE_NAME = "test_queue_Name_li_topic_02";
    private static final String ROUTING_KEY = "test.routingkey.*";
......
}

public class TopicConsumer03 {

    private static final String EXCHAGE_NAME = "test_exchange_li_topic";
    private static final String QUEUE_NAME = "test_queue_Name_li_topic_03";
    private static final String ROUTING_KEY = "test.routingkey";
....
}
複製代碼
輸出結果只有TopicConsumer01和TopicConsumer02有日誌輸出
message0
message01
message012
message0123
message01234
message012345
message0123456
message01234567
message012345678
message0123456789
複製代碼

內容的持久化durable 表示持久化

  1. Queue的持久化
com.rabbitmq.client.Channel

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;

複製代碼
  1. Message的持久化
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;


public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties {
        private String contentType;
        private String contentEncoding;
        private Map<String,Object> headers;
        private Integer deliveryMode;
        private Integer priority;
        private String correlationId;
        private String replyTo;
        private String expiration;
        private String messageId;
        private Date timestamp;
        private String type;
        private String userId;
        private String appId;
        private String clusterId;

其中deliveryMode=2表示持久化
複製代碼
  1. Exchange的持久化
com.rabbitmq.client.Channel

Exchange.DeclareOk exchangeDeclare(String exchange,
                                              String type,
                                              boolean durable,
                                              boolean autoDelete,
                                              boolean internal,
                                              Map<String, Object> arguments) throws IOException;

複製代碼
  1. binding是如何持久化的 當queueexchange都被定義爲持久化的狀況下,兩種個相關聯的binding也會被保存下來,若是queue或者exchange被刪除,這個綁定關係也會被刪除

就算上面三個的持久化方式都開啓了,也不能保證消息在使用過程當中徹底不丟失,若是消費者autoAck=true,在收到消息以後自動確認了,可是處理時服務崩了,就會致使消息的丟失,因此須要確認機制的支持服務器

消息投遞的確認模式

  1. 默認狀況下,生產者投遞消息後,broker時不會作出任何返回的
  2. 解決方式以下:
    1. 使用Amqp協議中的事務機制效率低,影響吞吐量
    2. 將信道channel設置成確認模式

channel信道的確認模式

channel設置成確認模式以後,全部提交的消息都會被分配一條惟一的ID,當消息被投遞到匹配的隊列中,信道會向生產者發出確認消息,而且消息中帶上這個Id。確認模式是異步的,生產者能夠發送完一條消息後繼續發送下一條消息。調用channel的confirmSelect方法開啓確認模式網絡

  1. 普通方式,發送完成以後調用waitForConfirms
  2. 異步回調模式,addConfirmListener註冊回調函數

消息消費的應答模式

  1. autoAck,若是等於true,會在消息發送過來以後自動響應--隊列會將該消息刪除,可能會致使消息消費失敗了,可是消息已經被刪除的狀況
  2. autoAck=false,須要業務邏輯在處理完成以後,調用channel.basicAck作顯示的響應

消費者獲取消息時,能夠指定預取的消息數量 經過channel的basicQos方法設置app

rabbitmq 的死信隊列至關於爲一個隊列設置一個備用的隊列,在出現如下狀況的時候將所謂的死亡信息推送到死亡信息隊列

  1. 消息被拒絕(basic.reject/ basic.nack)而且再也不從新投遞 requeue=false
  2. 消息超期 (rabbitmq Time-To-Live -> messageProperties.setExpiration())
  3. 隊列超載 具體內容參考:my.oschina.net/u/2948566/b…

rabbitmq的blackhole問題

什麼是blackhole問題 生產者向exchange投遞message,而因爲各類緣由致使該 message 丟失,但發送者殊不知道異步

致使balckhole問題的緣由
  1. 向未綁定 queue 的 exchange 發送 message
  2. exchange 以 binding_key key_A綁定了 queue queue_A,但向該 exchange 發送 message 使用的 routing_key 倒是 key_B。使用了不存在的綁定關係

如何防止blackhole 沒有特別好的辦法,只能在具體實踐中經過各類方式保證相關 fabric 的存在。另外,若是在執行 Basic.Publish 時設置 mandatory=true ,則在遇到可能出現 blackholed 狀況時,服務器會經過返回 Basic.Return 告之當前 message 沒法被正確投遞(內含緣由 312 NO_ROUTE)。ide

Consumer Cancellation Notification 機制

channel消費queue的時候,可能由於某些緣由致使消費中止,

  1. 消費者發起 basic.cacncel命令
channel.basicCancel(consumerTag);
複製代碼
  1. 隊列被刪除或者節點失敗也有可能致使消費被取消

正常狀況下,第一種狀況,由於是消費者本身發起的,本身能夠感知到,可是第二種狀況若是沒有一個通知機制的話,可能會致使消費者一直傻傻的在等一個不可能來的消息 因此爲了不上面這些狀況出現,RabbitMQ引入了擴展特性:因爲消息中間件代理出現的異常或者正常狀況致使消費者取消,會向對應的消費者(信道)發送basic.cancel,可是由客戶端信道主動向消息中間件代理髮送basic.cancel以取消消費者的狀況下不會受到消息中間件代理的basic.cancel回覆。

channel.basicConsume("throwable.queue.direct", new DefaultConsumer(channel) {

				@Override
				public void handleCancelOk(String consumerTag) {
					System.out.println("收到來自消息中間件代理的basic.cancel-ok回覆,consumerTag=" + consumerTag);
				}

				@Override
				public void handleCancel(String consumerTag) throws IOException {
					System.out.println("收到來自消息中間件代理的basic.cancel回覆,consumerTag=" + consumerTag);
				}
			});
複製代碼

rebbitmq的集羣知識

參考資料: www.cnblogs.com/xishuai/p/r… www.ywnds.com/?p=4741 blog.csdn.net/zhu_tianwei… 對異常狀況解釋的比較多

爲何須要集羣

  1. 擺脫單機資源上的限制,提供更大的吞吐量
  2. 提供更加穩定的更高可用的服務

集羣中節點之間須要同步的信息rabbitmq的元數據

a. 隊列元數據:隊列名稱和它的屬性; b. 交換器元數據:交換器名稱、類型和屬性; c. 綁定元數據:一張簡單的表格展現瞭如何將消息路由到隊列; d. vhost元數據:爲vhost內的隊列、交換器和綁定提供命名空間和安全屬性; e. 元數據信息須要保存的磁盤中,因此每一個集羣中至少須要一個disk節點

所以,當用戶訪問其中任何一個RabbitMQ節點時,經過rabbitmqctl查詢到的queue/user/exchange/vhost等信息都是相同的。

rabbitmq集羣中同步元數據.png

rabbitmq 的集羣模式

集羣內節點的類型

磁盤節點

保存數據到磁盤和內存中,若是集羣中羣都是內存節點,那就不能中止他們,不然元數據就會丟。

RabbitMQ只要求集羣中至少有一個磁盤節點,若是隻有一個磁盤節點,恰好又崩潰了,集羣能夠繼續路由消息,但不能建立隊列、交換器、綁定、添加用戶、更改權限等操做。因此,建議設置兩個磁盤節點,當內存節點重啓後,會鏈接到預先配置的磁盤節點,下載當前集羣元數據拷貝,因此要將全部磁盤節點告訴內存節點。

內存節點

數據只保存到內存中,除非遇到

  1. publish消息的時候指定須要持久化
  2. 內存吃緊的時候,會把部分消息持久化到磁盤

內存節點的特色就是執行效率高

普通模式

不是每一個節點都有全部隊列的徹底拷貝,若是在集羣中建立隊列,只會在單個節點上建立完整的隊列信息(元數據、狀態、內容),全部其餘節點只知道隊列的元數據和指向該隊列的節點指針。

既然一個隊列的數據只存在一個節點上,那麼在鏈接集羣內其餘節點的時候,是如何進行發佈消息和消費消息的呢? 若是消息生產者所鏈接的是節點2或者節點3,此時隊列1的完整數據不在該兩個節點上,那麼在發送消息過程當中這兩個節點主要起了一個路由轉發做用,根據這兩個節點上的元數據(也就是上文提到的:指向queue的owner node的指針)轉發至節點1上,最終發送的消息仍是會存儲至節點1的隊列1上。 一樣,若是消息消費者所鏈接的節點2或者節點3,那這兩個節點也會做爲路由節點起到轉發做用,將會從節點1的隊列1中拉取消息進行消費。

若是節點崩潰了,附加在隊列上的消費者也就沒法接收新的消息了。可讓消費者重連到集羣並從新建立隊列,這種作法僅當隊列沒設置持久化時纔可行,若是作了隊列持久化或消息持久化,必須等到對應的節點恢復了才能被消費,這是爲了確保當失敗的節點恢復後加入集羣,節點上的隊列消息不會丟失。

爲何不將隊列內容和狀態複製到全部節點:

  1. 存儲空間,若是每一個集羣節點都擁有全部隊列的徹底拷貝,添加新節點不會帶來更多存儲空間;
  2. 性能,消息的發佈者須要將消息複製到每個集羣節點,對於持久化消息,網絡和磁盤複製都會增長。

優勢:

  1. 使用集羣能很好的實現服務能力的水平拓展

缺點:

  1. 由於單個隊列只維持在單個節點上,也很難認爲是高可用
  2. 若是是不持久化的消息和隊列,單機宕機後消息會丟失

鏡像模式 把須要的隊列作成鏡像隊列,存在於多個節點,屬於RabbitMQ的HA方案

根據策略能夠爲節點定義鏡像節點,鏡像節點之間能夠實現隊列中消息實體的同步。 對於發送方確認消息,Rabbit會在全部隊列和隊列的從拷貝安全地接收到消息時,纔會通知發送方。

rabbitmq鏡像集羣模式的策略.jpg

優勢:

  1. 由於能對節點維護的隊列中的消息實體作了同步,能夠保證

缺點:

  1. 由於要進行消息實體的複製,因此勢必會影響系統的性能
  2. 網絡通訊也會加大,若是消息量比較大話
相關文章
相關標籤/搜索