爲何要使用MQ消息中間件?它解決了什麼問題?關於爲何要使用消息中間件?消息中間件是如何作到同步變異步、流量削鋒、應用解耦的?網上已經有不少說明,我這裏就再也不說明。我在接下來的RabbitMq系列博客裏會將官方的講解翻譯過來,同時加以本身的理解整理成博客,但願能和你們共同交流,一塊兒進步。java
RabbitMq原理圖程序員
RabbitMq是一個消息中間件:它接收消息、轉發消息。你能夠把它理解爲一個郵局:當你向郵箱裏寄出一封信後,郵遞員們就能最終將信送到收信人手中。相似的,RabbitMq就比如是一個郵箱、郵局和郵遞員。RabbitMq和郵局最大的區別是:RabbitMq接收、轉發的都是二進制數據塊--消息,而不是紙質的數據文件。spring
RabbitMq、消息相關術語以下:api
生產者:生產者只發送消息,發送消息的程序即爲生產者:數組
消息隊列:消息隊列就至關於RabbitMq中的郵箱名稱。儘管消息在你的程序和RabbitMq中流動,但它只能存儲在消息隊列中。隊列本質上是一個大的消息緩存,它能存多少消息,取決於主機的內存和磁盤限制。多個生產者能夠往同一個消息隊列中發送消息;多個消費者能夠從同一個隊列中獲取數據。咱們如下列圖形來表示一個消息隊列:緩存
消費者:消費者是一個等待接收消息的程序:服務器
注意:生產者、消費者和RabbitMq能夠在不一樣的機器上;在不少的應用中,一個生產者同時也多是消費者。框架
在這小節裏,咱們將寫一個消息生產者用來發送消息、一個消息消費者來消費消息(接收消息並打印出來)。異步
在下面圖形中,「P」是咱們的生產者,「C」是咱們的消費者,中間的紅框是咱們的消息隊列,保存了從生產者那裏接收到的準備轉發到消費方的消息。maven
RabbitMq使用多種協議,本指南使用AMQP 0-9-1協議,該協議是一個開源的、通用的消息協議。RabbitMq有多種語言的客戶端,這裏咱們使用JAVA語言的客戶端作實驗。經過如下地址下載RabbitMq客戶端jar包和依賴包:
把這三個jar包拷貝到你的工做目錄,包括後面教程要新建的java文件。
生產者鏈接RabbitMq,發送一條簡單的消息」Hello World!「後就退出。
在Send.java類中,須要引入如下依賴包:
1 import com.rabbitmq.client.ConnectionFactory; 2 import com.rabbitmq.client.Connection; 3 import com.rabbitmq.client.Channel;
給隊列起個名字:
1 public class Send { 2 private final static String QUEUE_NAME = "hello"; 3 public static void main(String[] argv) throws Exception { 4 ... 5 } 6 }
建立鏈接到服務器的鏈接Collection:
1 onnectionFactory factory = new ConnectionFactory(); 2 factory.setHost("localhost"); 3 try (Connection connection = factory.newConnection(); 4 Channel channel = connection.createChannel()) { 5 6 }
這個鏈接即套接字鏈接,爲咱們處理協議版本協商和身份驗證等。這裏咱們鏈接一個本地的RabbitMq:所以是localhost,若是你想要鏈接一個遠程機器上的RabbitMq,只須要把localhst改爲那臺機器的計算機名或是IP地址。
建立完鏈接以後,咱們繼續建立一個信道:Channel。咱們須要使用try-with-resource表達式,由於Connection和Channel都實現了JAVA接口Closeable,屬於資源,須要關閉,這樣咱們就不須要顯示地在咱們的代碼中進行關閉了。(關於信道,請參考文章最頂部的RabbitMq原理圖,是TCP裏面的虛擬連接,例如:電纜至關於一個TCP,信道就是裏面的一個獨立光纖,一條TCP上面建立多條信道是沒有問題的;TCP一旦打開就分建立AMQP信道;不管是發佈消息、接收消息、訂閱隊列,這些動做都是經過信道完成的)。
爲了發送消息,咱們還必需要定義一個須要發送到的消息隊列,這些都要使用try-with-resource表達式:
1 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 2 String message = "Hello World!"; 3 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 4 System.out.println(" [x] Sent '" + message + "'");
定義一個消息隊列是冪等的:只有該隊列不存在的時候才能被建立,消息是二進制數組,所以你能夠根據須要指定編碼。
完成的Send.java以下:
1 import com.rabbitmq.client.Channel; 2 import com.rabbitmq.client.Connection; 3 import com.rabbitmq.client.ConnectionFactory; 4 5 public class Send { 6 7 private final static String QUEUE_NAME = "hello"; 8 9 public static void main(String[] argv) throws Exception { 10 ConnectionFactory factory = new ConnectionFactory(); 11 factory.setHost("localhost"); 12 try (Connection connection = factory.newConnection(); 13 Channel channel = connection.createChannel()) { 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 String message = "Hello World!"; 16 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); 17 System.out.println(" [x] Sent '" + message + "'"); 18 } 19 } 20 }
消費者監聽RabbitMq中的消息,所以與生產者發送一條消息就退出不一樣,消費者要保持運行狀態來接收消息並打印出來。
Recv.java一樣須要導入如下依賴包:
1 import com.rabbitmq.client.Channel; 2 import com.rabbitmq.client.Connection; 3 import com.rabbitmq.client.ConnectionFactory; 4 import com.rabbitmq.client.DeliverCallback;
與生產者相同,咱們須要建立Connetcion和Channel、定義隊列(須要監聽並接收消息的隊列):
1 public class Recv { 2 3 private final static String QUEUE_NAME = "hello"; 4 5 public static void main(String[] argv) throws Exception { 6 ConnectionFactory factory = new ConnectionFactory(); 7 factory.setHost("localhost"); 8 Connection connection = factory.newConnection(); 9 Channel channel = connection.createChannel(); 10 11 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 12 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 13 14 } 15 }
注意咱們也在這裏聲明隊列,由於咱們可能在生產者以前啓動消費者,咱們想要確保在咱們嘗試消費消息的時候隊列就已經存在了。
這裏咱們爲何不使用try-with-resource表達式自動關閉channl和connection?經過這樣,咱們就可使咱們的程序一直保持運行狀態,若是把這些關了,程序也就中止了。這就尷尬了,由於咱們須要保持消費者一直處於異步監聽消息過來的狀態。
RabbitMq會將隊列中的消息異步地推送過來,咱們須要提供一個回調函數來緩存消息直到咱們須要用到這些消息:
1 DeliverCallback deliverCallback = (consumerTag, delivery) -> { 2 String message = new String(delivery.getBody(), "UTF-8"); 3 System.out.println(" [x] Received '" + message + "'"); 4 }; 5 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
Rec.java完整代碼:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
在官方手冊中,測試部分他們是將客戶端jar和依賴jar添加到classpath路徑,而後在cmd終端來運行的,我以爲麻煩,所以,我這裏放到IDEA中來運行,效果是同樣的。
第一步:首先運行Send.java:
輸出結果:
[x] Sent 'Hello World!'
查看RabbitMq控制檯:
說明消息已經發送成功。
第二步:啓動消費者Recv.java:
輸出結果:
[x] Received 'Hello World!'
說明消息已經消費成功了,此時再查看控制檯:
消息依然存在在隊列中,可是區別是,在第一張圖中Ready由1變成了0,Unacknowledged由0變成了1;第二張圖中Ready也由1變成0,Unacked由0變成了1。爲何會這樣?按道理,消息消費了以後就應該刪除掉,不然可能形成重複消費。關於這方面知識,將會在後面的章節中再介紹(Ack機制)。
上面雖然實現了功能,但在實際工做中,咱們更多的多是使用SpringBoot、SpringCloud等成熟的框架來實現。本小節就經過SpringBoot來實現以上功能。
建立工程的時候選擇RabbitMq:
工程目錄以下:
Provider和Consumer的配置文件相同,IP請替換成你本身的:
1 #RabbitMq 2 spring.rabbitmq.host=192.168.xx.xx 3 spring.rabbitmq.username=rabbitmq 4 spring.rabbitmq.password=123456 5 6 hello_world.queue=hello
爲方便讓系統啓動時就往隊列發送消息,因此寫了一個SenderRunner類:
1 @Component 2 public class SenderRunner implements ApplicationRunner { 3 4 @Autowired 5 private Send send; 6 7 @Override 8 public void run(ApplicationArguments args) throws Exception { 9 send.doSender("Hello RabbitMq"); 10 } 11 }
Send.java
1 @Component 2 public class Send { 3 4 @Value("${hello_world.queue}") 5 private String queueName; 6 7 @Autowired 8 private AmqpTemplate amqpTemplate; 9 10 public void doSender(String msg) { 11 12 amqpTemplate.convertAndSend(queueName,msg); 13 System.out.println("發送消息:" + msg); 14 } 15 }
啓動類:
1 @SpringBootApplication 2 public class ProviderApplication { 3 public static void main(String[] args) { 4 SpringApplication.run(ProviderApplication.class, args); 5 } 6 }
Recv.java
@Component public class Recv { @RabbitListener(queues = "${hello_world.queue}") public void receive(String msg) { System.out.println("接收到消息:" + msg); } }
啓動Provider:
查看控制檯:
啓動Consumer:
可見,SpringBoot爲咱們作了不少封裝,隱藏了不少底層的細節,使用起來簡單多了。