參考資料: 很豐富的面試題 html
broker
: 實體服務器 VirtualHost
:縮小版的RabbitMq服務器,擁有本身獨立的交換器、消息隊列和相關的對象 Exchange
:接受生產者的消息,並將這些消息路由到具體的Queue中 Binding
:Exchange和Queue之間的關聯 Queue:
用來保存消息直到發送給消費者,它是消息的容器。 Channel
:多路複用鏈接中的獨立的雙向數據流通道,由於創建和銷燬TCP的Connection開銷node
會將消息放到全部綁定到該exchange的隊列上
面試
public class ConnectionUtils {
private static Connection connection;
private static String lock = "aaa";
/**
* 獲取rabbitmq鏈接
* @return
*/
public static Connection getConnection() {
if (null != connection) {
return connection;
}
synchronized (lock) {
if (null != connection) {
return connection;
}
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("host");
connectionFactory.setUsername("userName");
connectionFactory.setPassword("password");
connectionFactory.setVirtualHost("/vhost");
try {
connection = connectionFactory.newConnection();
} catch (IOException e) {
throw new RuntimeException("IoException", e);
} catch (TimeoutException e) {
throw new RuntimeException("timeOutException", e);
}
return connection;
}
}
}
複製代碼
public class FanOutProducer {
private static final String EXCHAGE_NAME = "test_exchange_li_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
//創建鏈接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//聲明一個exchange,fanout類型,不持久化
channel.exchangeDeclare(EXCHAGE_NAME, "fanout", false);
//發送消息
StringBuilder stringBuilder = new StringBuilder("message");
for (int i = 0; i < 10; i++) {
channel.basicPublish(EXCHAGE_NAME, "", null, stringBuilder.append(i).toString().getBytes("utf-8"));
}
//關閉鏈接
channel.close();
connection.close();
}
}
複製代碼
public class FanOutConsumer01 {
private static final String EXCHAGE_NAME = "test_exchange_li_fanout";
private static final String QUEUE_NAME = "test_queue_Name_li_fanout";
public static void main(String[] args) throws IOException {
//創建連接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//聲明exchange和queue--exchange要和生產者一致
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHAGE_NAME, "fanout");
//將queue綁定到exchange上
channel.queueBind(QUEUE_NAME, EXCHAGE_NAME, "");
//定義消費者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "utf-8");
System.out.println(message);
}
};
//開始消費
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
複製代碼
public class FanOutConsumer02 {
private static final String EXCHAGE_NAME = "test_exchange_li_fanout";
private static final String QUEUE_NAME = "test_queue_Name_li_fanout02";
....
}
複製代碼
兩個消費者都能打印以下
message0
message01
message012
message0123
message01234
message012345
message0123456
message01234567
message012345678
message0123456789
複製代碼
只會把消息routingkey一致的queue中
安全
public class DirectProducer {
private static final String EXCHAGE_NAME = "test_exchange_li_direct";
private static final String ROUTING_KEY = "direct_routing_key";
public static void main(String[] args) throws IOException, TimeoutException {
//創建鏈接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//聲明一個exchange,direct類型,不持久化
channel.exchangeDeclare(EXCHAGE_NAME, "direct", false);
//發送消息
StringBuilder stringBuilder = new StringBuilder("message");
for (int i = 0; i < 10; i++) {
channel.basicPublish(EXCHAGE_NAME, ROUTING_KEY, null, stringBuilder.append(i).toString().getBytes("utf-8"));
}
//關閉鏈接
channel.close();
connection.close();
}
}
複製代碼
public class DirectConsumer01 {
private static final String EXCHAGE_NAME = "test_exchange_li_direct";
private static final String QUEUE_NAME = "test_queue_Name_li_direct";
private static final String ROUTING_KEY = "direct_routing_key";
public static void main(String[] args) throws IOException {
//創建連接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//聲明exchange和queue--exchange要和生產者一致
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHAGE_NAME, "direct");
//將queue綁定到exchange上
channel.queueBind(QUEUE_NAME, EXCHAGE_NAME, ROUTING_KEY);
//定義消費者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "utf-8");
System.out.println(message);
}
};
//開始消費
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
複製代碼
public class DirectConsumer02 {
private static final String EXCHAGE_NAME = "test_exchange_li_direct";
private static final String QUEUE_NAME = "test_queue_Name_li_direct01";
private static final String ROUTING_KEY = "direct_routing_key02";
....
}
複製代碼
運行完成,只有DirectConsumer01能收到消息
message0
message01
message012
message0123
message01234
message012345
message0123456
message01234567
message012345678
message0123456789
複製代碼
對key進行模式匹配後進行投遞,符號」#」匹配一個或多個詞,符號」」匹配正好一個詞。例如」abc.#」匹配」abc.def.ghi」,」abc.」只匹配」abc.def」bash
public class TopicProducer {
private static final String EXCHAGE_NAME = "test_exchange_li_topic";
private static final String ROUTING_KEY = "test.routingkey.01";
public static void main(String[] args) throws IOException, TimeoutException {
//創建鏈接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//聲明一個exchange,topic類型,不持久化
channel.exchangeDeclare(EXCHAGE_NAME, "topic", false);
//發送消息
StringBuilder stringBuilder = new StringBuilder("message");
for (int i = 0; i < 10; i++) {
channel.basicPublish(EXCHAGE_NAME,ROUTING_KEY, null, stringBuilder.append(i).toString().getBytes("utf-8"));
}
//關閉鏈接
channel.close();
connection.close();
}
}
複製代碼
public class TopicConsumer01 {
private static final String EXCHAGE_NAME = "test_exchange_li_topic";
private static final String QUEUE_NAME = "test_queue_Name_li_topic_01";
private static final String ROUTING_KEY = "test.routingkey.#";
public static void main(String[] args) throws IOException {
//創建連接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//聲明exchange和queue--exchange要和生產者一致
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHAGE_NAME, "topic");
//將queue綁定到exchange上
channel.queueBind(QUEUE_NAME, EXCHAGE_NAME, ROUTING_KEY);
//定義消費者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "utf-8");
System.out.println(message);
}
};
//開始消費
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
複製代碼
爲了節省篇幅,
public class TopicConsumer02 {
private static final String EXCHAGE_NAME = "test_exchange_li_topic";
private static final String QUEUE_NAME = "test_queue_Name_li_topic_02";
private static final String ROUTING_KEY = "test.routingkey.*";
......
}
public class TopicConsumer03 {
private static final String EXCHAGE_NAME = "test_exchange_li_topic";
private static final String QUEUE_NAME = "test_queue_Name_li_topic_03";
private static final String ROUTING_KEY = "test.routingkey";
....
}
複製代碼
輸出結果只有TopicConsumer01和TopicConsumer02有日誌輸出
message0
message01
message012
message0123
message01234
message012345
message0123456
message01234567
message012345678
message0123456789
複製代碼
durable 表示持久化
com.rabbitmq.client.Channel
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
複製代碼
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties {
private String contentType;
private String contentEncoding;
private Map<String,Object> headers;
private Integer deliveryMode;
private Integer priority;
private String correlationId;
private String replyTo;
private String expiration;
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;
其中deliveryMode=2表示持久化
複製代碼
com.rabbitmq.client.Channel
Exchange.DeclareOk exchangeDeclare(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
複製代碼
queue
和exchange
都被定義爲持久化的狀況下,兩種個相關聯的binding也會被保存下來,若是queue
或者exchange
被刪除,這個綁定關係也會被刪除就算上面三個的持久化方式都開啓了,也不能保證消息在使用過程當中徹底不丟失,若是消費者autoAck=true,在收到消息以後自動確認了,可是處理時服務崩了,就會致使消息的丟失,因此須要確認機制的支持服務器
效率低,影響吞吐量
channel設置成確認模式以後,全部提交的消息都會被分配一條惟一的ID,當消息被投遞到匹配的隊列中,信道會向生產者發出確認消息,而且消息中帶上這個Id。確認模式是異步的,生產者能夠發送完一條消息後繼續發送下一條消息。調用channel的confirmSelect方法開啓確認模式
網絡
消費者獲取消息時,能夠指定預取的消息數量 經過channel的basicQos方法設置app
至關於爲一個隊列設置一個備用的隊列,在出現如下狀況的時候將所謂的死亡信息推送到死亡信息隊列
requeue=false
什麼是blackhole問題 生產者向exchange投遞message,而因爲各類緣由致使該 message 丟失,但發送者殊不知道異步
使用了不存在的綁定關係
如何防止blackhole 沒有特別好的辦法,只能在具體實踐中經過各類方式保證相關 fabric 的存在。另外,若是在執行 Basic.Publish 時設置 mandatory=true ,則在遇到可能出現 blackholed 狀況時,服務器會經過返回 Basic.Return 告之當前 message 沒法被正確投遞(內含緣由 312 NO_ROUTE)。ide
channel消費queue的時候,可能由於某些緣由致使消費中止,
channel.basicCancel(consumerTag);
複製代碼
正常狀況下,第一種狀況,由於是消費者本身發起的,本身能夠感知到,可是第二種狀況若是沒有一個通知機制的話,可能會致使消費者一直傻傻的在等一個不可能來的消息 因此爲了不上面這些狀況出現,RabbitMQ引入了擴展特性:因爲消息中間件代理出現的異常或者正常狀況致使消費者取消,會向對應的消費者(信道)發送basic.cancel,可是由客戶端信道主動向消息中間件代理髮送basic.cancel以取消消費者的狀況下不會受到消息中間件代理的basic.cancel回覆。
channel.basicConsume("throwable.queue.direct", new DefaultConsumer(channel) {
@Override
public void handleCancelOk(String consumerTag) {
System.out.println("收到來自消息中間件代理的basic.cancel-ok回覆,consumerTag=" + consumerTag);
}
@Override
public void handleCancel(String consumerTag) throws IOException {
System.out.println("收到來自消息中間件代理的basic.cancel回覆,consumerTag=" + consumerTag);
}
});
複製代碼
參考資料: www.cnblogs.com/xishuai/p/r… www.ywnds.com/?p=4741 blog.csdn.net/zhu_tianwei… 對異常狀況解釋的比較多
rabbitmq的元數據
a. 隊列元數據:隊列名稱和它的屬性; b. 交換器元數據:交換器名稱、類型和屬性; c. 綁定元數據:一張簡單的表格展現瞭如何將消息路由到隊列; d. vhost元數據:爲vhost內的隊列、交換器和綁定提供命名空間和安全屬性; e. 元數據信息須要保存的磁盤中,因此每一個集羣中至少須要一個disk節點
所以,當用戶訪問其中任何一個RabbitMQ節點時,經過rabbitmqctl查詢到的queue/user/exchange/vhost等信息都是相同的。
保存數據到磁盤和內存中,若是集羣中羣都是內存節點,那就不能中止他們,不然元數據就會丟。
RabbitMQ只要求集羣中至少有一個磁盤節點,若是隻有一個磁盤節點,恰好又崩潰了,集羣能夠繼續路由消息,但不能建立隊列、交換器、綁定、添加用戶、更改權限等操做。因此,
建議設置兩個磁盤節點
,當內存節點重啓後,會鏈接到預先配置的磁盤節點,下載當前集羣元數據拷貝,因此要將全部磁盤節點告訴內存節點。
數據只保存到內存中,除非遇到
內存節點的特色就是執行效率高
不是每一個節點都有全部隊列的徹底拷貝,若是在集羣中建立隊列,只會在單個節點上建立完整的隊列信息(元數據、狀態、內容),全部其餘節點只知道隊列的元數據和指向該隊列的節點指針。
既然一個隊列的數據只存在一個節點上,那麼在鏈接集羣內其餘節點的時候,是如何進行發佈消息和消費消息的呢?
若是消息生產者所鏈接的是節點2或者節點3,此時隊列1的完整數據不在該兩個節點上,那麼在發送消息過程當中這兩個節點主要起了一個路由轉發做用,根據這兩個節點上的元數據(也就是上文提到的:指向queue的owner node的指針)轉發至節點1上,最終發送的消息仍是會存儲至節點1的隊列1上。 一樣,若是消息消費者所鏈接的節點2或者節點3,那這兩個節點也會做爲路由節點起到轉發做用,將會從節點1的隊列1中拉取消息進行消費。
若是節點崩潰了,附加在隊列上的消費者也就沒法接收新的消息了。可讓消費者重連到集羣並從新建立隊列,這種作法僅當隊列沒設置持久化時纔可行,若是作了隊列持久化或消息持久化,必須等到對應的節點恢復了才能被消費
,這是爲了確保當失敗的節點恢復後加入集羣,節點上的隊列消息不會丟失。
爲何不將隊列內容和狀態複製到全部節點:
優勢:
缺點:
把須要的隊列作成鏡像隊列,存在於多個節點,屬於RabbitMQ的HA方案
根據策略能夠爲節點定義鏡像節點,鏡像節點之間能夠實現隊列中消息實體的同步。 對於發送方確認消息,Rabbit會在全部隊列和隊列的從拷貝安全地接收到消息時,纔會通知發送方。
優勢:
缺點: