人生終將是場單人旅途,孤獨以前是迷茫,孤獨事後是成長。html
先給你們說聲抱歉,最近一週都沒有發文,有一些比較要緊重要的事須要處理。java
今天正好得空,原本說準備寫SpringIOC
相關的東西,可是發現想要梳理一遍仍是須要不少時間,因此我打算慢慢寫,先把MQ給寫了,再慢慢寫其餘相關的,畢竟偏理論的東西一遍要比較難寫,像MQ這種偏實戰的你們能夠clone代碼去玩一玩,仍是比較方便的。git
同時MQ也是Java進階沒必要可少的技術棧之一,因此Java開發從業者對它是必需要了解的。程序員
如今市面上有三種消息隊列比較火分別是:RabbitMQ
,RocketMQ
和Kafka
。github
今天要講的消息隊列中我會以RabbitMQ
做爲案例來入門,由於SpringBoot的amqp中默認只集成了RabbitMQ
,用它來說會方便許多,且RabbitMQ
的性能和穩定性都很不錯,是一款通過時間考驗的開源組件。面試
祝有好收穫。spring
消息隊列(MQ)全稱爲Message Queue,是一種應用程序對應用程序的通訊方法。數組
翻譯一下就是:在應用之間放一個消息組件,而後應用雙方經過這個消息組件進行通訊。瀏覽器
好端端的爲啥要在中間放個組件呢?
小系統實際上是用不到消息隊列的,通常分佈式系統纔會引入消息隊列,由於分佈式系統須要抗住高併發,須要多系統解耦,更須要對用戶比較友好的響應速度,而消息隊列的特性能夠自然解耦,方便異步更能起到一個頂住高併發的削峯做用,完美解決上面的三個問題。
然萬物抱陽負陰,系統之間忽然加了箇中間件,提升系統複雜度的同時也增長了不少問題:
這些都是使用消息隊列過程當中須要思考須要考慮的地方,消息隊列能給你帶來很大的便利,也能給你帶來一些對應的麻煩。
上面說了消息隊列帶來的好處以及問題,而這些不在咱們今天這篇的討論範圍以內,我打算以後再寫這些,咱們今天要作的是搭建出一個消息隊列環境,讓你們感覺一下基礎的發消息與消費消息,更高級的問題會放在之後討論。
RabbitMQ是一個消息組件,是一個erlang開發的AMQP(Advanced Message Queue)的開源實現。
AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。
RabbitMQ採用了AMQP協議,至於這協議怎麼怎麼樣,咱們關心的是RabbitMQ
結構如何且怎麼用。
仍是那句話,學東西須要先觀其大貌,咱們要用RabbitMQ首先要知道它總體是怎麼樣,這樣纔有利於咱們接下來的學習。
咱們先來看看我剛畫的架構圖,由於RabbitMQ實現了AMQP協議,因此這些概念也是AMQP中共有的。
Broker
: 中間件自己。接收和分發消息的應用,這裏指的就是RabbitMQ Server。
Virtual host
: 虛擬主機。出於多租戶和安全因素設計的,把AMQP的基本組件劃分到一個虛擬的分組中,相似於網絡中的namespace概念。當多個不一樣的用戶使用同一個RabbitMQ server提供的服務時,能夠劃分出多個vhost,每一個用戶在本身的vhost建立exchange/queue等。
Connection
: 鏈接。publisher/consumer和broker之間的TCP鏈接。斷開鏈接的操做只會在client端進行,Broker不會斷開鏈接,除非出現網絡故障或broker服務出現問題。
Channel
: 渠道。若是每一次訪問RabbitMQ都創建一個Connection,在消息量大的時候創建TCP Connection的開銷會比較大且效率也較低。Channel是在connection內部創建的邏輯鏈接,若是應用程序支持多線程,一般每一個thread建立單獨的channel進行通信,AMQP method包含了channel id幫助客戶端和message broker識別channel,因此channel之間是徹底隔離的。Channel做爲輕量級的Connection極大減小了操做系統創建TCP connection的開銷。
Exchange
: 路由。根據分發規則,匹配查詢表中的routing key,分發消息到queue中去。
Queue
: 消息的隊列。消息最終被送到這裏等待消費,一個message能夠被同時拷貝到多個queue中。
Binding
: 綁定。exchange和queue之間的虛擬鏈接,binding中能夠包含routing key。Binding信息被保存到exchange中的查詢表中,用於message的分發依據。
看完了這些概念,我再給你們梳理一遍其流程:
當咱們的生產者端往Broker(RabbitMQ)
中發送了一條消息,Broker
會根據其消息的標識送往不一樣的Virtual host
,而後Exchange
會根據消息的路由key
和交換器類型將消息分發到本身所屬的Queue
中去。
而後消費者端會經過Connection
中的Channel
獲取剛剛推送的消息,拉取消息進行消費。
Tip:某個Exchange
有哪些屬於本身的Queue
,是由Binding
綁定關係決定的。
上面講了RabbitMQ
大概的結構圖和一個消息的運行流程,講完了理論,這裏咱們就準備實操一下吧,先進行RabbitMQ安裝。
官網下載地址:www.rabbitmq.com/download.ht…
因爲我尚未屬於本身MAC電腦,因此這裏的演示就按照Windows的來了,不過你們都是程序員,安裝個東西總歸是難不倒你們的吧😂
Windows下載地址:www.rabbitmq.com/install-win…
進去以後能夠直接找到Direct Downloads,下載相關EXE程序進行安裝就能夠了。
因爲RabbitMQ
是由erlang語言編寫的,因此安裝以前咱們還須要安裝erlang環境,你下載RabbitMQ
以後直接點擊安裝,若是沒有相關環境,安裝程序會提示你,而後會讓你的瀏覽器打開erlang的下載頁面,在這個頁面上根據本身的系統類型點擊下載安裝便可,安裝完畢後再去安裝RabbitMQ
。
這二者的安裝都只須要一直NEXT
下一步就能夠了。
安裝完成以後能夠按一下Windows
鍵看到效果以下:
Tip:其中Rabbit-Command後面會用到,是RabbitMQ的命令行操做臺。
安裝完RabbitMQ
咱們須要對咱們的開發環境也導入RabbitMQ
相關的JAR包。
爲了方便起見,咱們能夠直接使用Spring-boot-start
的方式導入,這裏面也會包含全部咱們須要用到的RabbitMQ
相關的JAR包。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
複製代碼
直接引入spring-boot-starter-amqp
便可。
搭建好環境以後,咱們就能夠上手了。
考慮到這是一個入門文章,讀者不少可能沒有接觸過RabbitMQ
,直接使用自動配置的方式可能會令你們很迷惑,由於自動配置會屏蔽不少細節,致使你們只看到了被封裝後的樣子,不利於你們理解。
因此在本節Hello World
這裏,我會直接使用最原始的鏈接方式就行演示,讓你們看到最原始的鏈接的樣子。
Tip:這種方式演示的代碼我都在放在prototype
包下面。
先來看看生產者代碼,也就是咱們push消息的代碼:
public static final String QUEUE_NAME = "erduo";
// 建立鏈接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 鏈接到本地server
connectionFactory.setHost("127.0.0.1");
// 經過鏈接工廠建立鏈接
Connection connection = connectionFactory.newConnection();
// 經過鏈接建立通道
Channel channel = connection.createChannel();
// 建立一個名爲耳朵的隊列,該隊列非持久(RabbitMQ重啓後會消失)、非獨佔(非僅用於此連接)、非自動刪除(服務器將再也不使用的隊列刪除)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello, 我是耳朵。" + LocalDateTime.now().toString();
// 發佈消息
// 四個參數爲:指定路由器,指定key,指定參數,和二進制數據內容
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("生產者發送消息結束,發送內容爲:" + msg);
channel.close();
connection.close();
複製代碼
代碼我都給了註釋,可是我仍是要給你們講解一遍,梳理一下。
先經過RabbitMQ
中的ConnectionFactory
配置一下將要鏈接的server-host,而後建立一個新鏈接,再經過此鏈接建立通道(Channel
),經過這個通道建立隊列和發送消息。
這裏看上去仍是很好理解的,我須要把建立隊列和發送消息這裏再拎出來講一下。
建立隊列
AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
複製代碼
建立隊列的方法裏面有五個參數,第一個是參數是隊列的名稱,日後的三個參數表明不一樣的配置,最後一個參數是額外參數。
durable:表明是否將此隊列持久化。
exclusive:表明是否獨佔,若是設置爲獨佔隊列則此隊列僅對首次聲明它的鏈接可見,並在鏈接斷開時自動刪除。
autoDelete:表明斷開鏈接後是否自動刪除此隊列。
arguments:表明其餘額外參數。
這些參數中durable常常會用到,它表明了咱們能夠對隊列作持久化,以保證RabbitMQ
宕機恢復後此隊列也能夠自行恢復。
發送消息
void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;
複製代碼
發送消息的方法裏是四個參數,第一個是必須的指定exchange,上面的示例代碼中咱們傳入了一個空字符串,這表明咱們交由默認的匿名exchange去幫咱們路由消息。
第二個參數是路由key,exchange會根據此key對消息進行路由轉發,第三個參數是額外參數,講消息持久化時會用到一下,最後一個參數就是咱們要發送的數據了,須要將咱們的數據轉成字節數組的方式傳入。
測試
講完了這些API以後,咱們能夠測試一下咱們的代碼了,run一下以後,會在控制檯打出以下:
這樣以後咱們就把消息發送到了RabbitMQ
中去,此時能夠打開RabbitMQ控制檯(前文安裝時提到過)去使用命令rabbitmqctl.bat list_queues
去查看消息隊列如今的狀況:
能夠看到有一條message
在裏面,這就表明咱們的消息已經發送成功了,接下來咱們能夠編寫一個消費者對裏面的message
進行消費了。
消費者代碼和生產者的差很少,都須要創建鏈接創建通道:
// 建立鏈接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 鏈接到本地server
connectionFactory.setHost("127.0.0.1");
// 經過鏈接工廠建立鏈接
Connection connection = connectionFactory.newConnection();
// 經過鏈接建立通道
Channel channel = connection.createChannel();
// 建立消費者,阻塞接收消息
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("-------------------------------------------");
System.out.println("consumerTag : " + consumerTag);
System.out.println("exchangeName : " + envelope.getExchange());
System.out.println("routingKey : " + envelope.getRoutingKey());
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("消息內容 : " + msg);
}
};
// 啓動消費者消費指定隊列
channel.basicConsume(Producer.QUEUE_NAME, consumer);
// channel.close();
// connection.close();
複製代碼
創建完通道以後,咱們須要建立一個消費者對象,而後用這個消費者對象去消費指定隊列中的消息。
這個示例中咱們就是新建了一個consumer
,而後用它去消費隊列-erduo
中的消息。
最後兩句代碼我給註釋掉了,由於一旦把鏈接也關閉了,那咱們的消費者就不能保持消費狀態了,因此要開着鏈接,監聽此隊列。
ok,運行這段程序,而後咱們的消費者會去隊列-erduo
拿到裏面的消息,效果以下:
consumerTag:是這個消息的標識。
exchangeName:是這個消息所發送exchange的名字,咱們先前傳入的是空字符串,因此這裏也是空字符串。
exchangeName:是這個消息所發送路由key。
這樣咱們的程序就處在一個監聽的狀態下,你再次調用生產者發送消息消費者就會實時的在控制上打印消息內容。
上面咱們演示了生產者和消費者,咱們生產者發送一條消息,消費者消費一條信息,這個時候咱們的RabbitMQ
應該有多少消息?
理論上來講發送一條,消費一條,如今裏面應該是0纔對,可是如今的狀況並非:
消息隊列裏面仍是有1條信息,咱們重啓一下消費者,又打印了一遍咱們消費過的那條消息,經過消息上面的時間咱們能夠看出來仍是當時咱們發送的那條信息,也就是說咱們消費者消費過了以後這條信息並無被刪除。
這種情況出現的緣由是由於RabbitMQ
消息接收確認機制,也就是說一條信息被消費者接收到了以後,須要進行一次確認操做,這條消息纔會被刪除。
RabbitMQ
中默認消費確認是手動的,也能夠將其設置爲自動刪除,自動刪除模式消費者接收到消息以後就會自動刪除這條消息,若是消息處理過程當中發生了異常,這條消息就等於沒被處理完可是也被刪除掉了,因此這裏咱們會一直使用手動確認模式。
消息接受確認(ACK)的代碼很簡單,只要在原來消費者的代碼里加上一句就能夠了:
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("-------------------------------------------");
System.out.println("consumerTag : " + consumerTag);
System.out.println("exchangeName : " + envelope.getExchange());
System.out.println("routingKey : " + envelope.getRoutingKey());
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("消息內容 : " + msg);
// 消息確認
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println("消息已確認");
}
};
複製代碼
咱們將代碼改爲如此以後,能夠再run一次消費者,能夠看看效果:
再來看看RabbitMQ
中的隊列狀況:
從圖中咱們能夠看出消息消費後已經成功被刪除了,其實大膽猜一猜,自動刪除應該是在咱們的代碼還沒執行以前就幫咱們返回了確認,因此這就致使了消息丟失的可能性。
咱們採用手動確認的方式以後,能夠先將邏輯處理完畢以後(可能出現異常的地方能夠try-catch
起來),把手動確認的代碼放到最後一行,這樣若是出現異常狀況致使這條消息沒有被確認,那麼這條消息會在以後被從新消費一遍。
今天的內容就到這裏,下一篇將會咱們將會撇棄傳統的手動創建鏈接的方式進行發消息收消息,而轉用Spring幫咱們定義好的註解和Spring提供的RabbitTemplate,更方便的收發消息。
消息隊列呢,其實用法都是同樣的,只是各個開源消息隊列的側重點稍有不一樣,咱們應該根據咱們本身的項目需求來決定咱們應該選取什麼樣的消息隊列來爲咱們的項目服務,這個項目選型的工做通常都是開發組長幫大家作了,通常是輪不到咱們來作的,可是面試的時候可能會考察相關知識,因此這幾種消息隊列咱們都應該有所涉獵。
好了,以上就是本期的所有內容,感謝你能看到這裏,歡迎對本文點贊收藏與評論,👍大家的每一個點贊都是我創做的最大動力。
我是耳朵,一個一直想作知識輸出的僞文藝程序員,咱們下期見。