爲何要使用MQ消息中間件?它解決了什麼問題?關於爲何要使用消息中間件?消息中間件是如何作到同步變異步、流量削鋒、應用解耦的?網上已經有不少說明,我這裏就再也不說明。我在接下來的RabbitMq系列博客裏會將官方的講解翻譯過來,同時加以本身的理解整理成博客,但願能和你們共同交流,一塊兒進步。java
RabbitMq原理圖spring
RabbitMq是一個消息中間件:它接收消息、轉發消息。你能夠把它理解爲一個郵局:當你向郵箱裏寄出一封信後,郵遞員們就能最終將信送到收信人手中。相似的,RabbitMq就比如是一個郵箱、郵局和郵遞員。RabbitMq和郵局最大的區別是:RabbitMq接收、轉發的都是二進制數據塊--消息,而不是紙質的數據文件。api
RabbitMq、消息相關術語以下:數組
生產者:生產者只發送消息,發送消息的程序即爲生產者:緩存
消息隊列:消息隊列就至關於RabbitMq中的郵箱名稱。儘管消息在你的程序和RabbitMq中流動,但它只能存儲在消息隊列中。隊列本質上是一個大的消息緩存,它能存多少消息,取決於主機的內存和磁盤限制。多個生產者能夠往同一個消息隊列中發送消息;多個消費者能夠從同一個隊列中獲取數據。咱們如下列圖形來表示一個消息隊列:bash
消費者:消費者是一個等待接收消息的程序:服務器
注意:生產者、消費者和RabbitMq能夠在不一樣的機器上;在不少的應用中,一個生產者同時也多是消費者。微信
在這小節裏,咱們將寫一個消息生產者用來發送消息、一個消息消費者來消費消息(接收消息並打印出來)。框架
在下面圖形中,「P」是咱們的生產者,「C」是咱們的消費者,中間的紅框是咱們的消息隊列,保存了從生產者那裏接收到的準備轉發到消費方的消息。異步
RabbitMq使用多種協議,本指南使用AMQP 0-9-1協議,該協議是一個開源的、通用的消息協議。RabbitMq有多種語言的客戶端,這裏咱們使用JAVA語言的客戶端作實驗。經過如下地址下載RabbitMq客戶端jar包和依賴包:
把這三個jar包拷貝到你的工做目錄,包括後面教程要新建的java文件。
生產者鏈接RabbitMq,發送一條簡單的消息」Hello World!「後就退出。
在Send.java類中,須要引入如下依賴包:
1 import com.rabbitmq.client.ConnectionFactory;
2 import com.rabbitmq.client.Connection;
3 import com.rabbitmq.client.Channel;
複製代碼
給隊列起個名字:
1 public class Send {
2 private final static String QUEUE_NAME = "hello";
3 public static void main(String[] argv) throws Exception {
4 ...
5 }
6 }
複製代碼
建立鏈接到服務器的鏈接Collection:
1 onnectionFactory factory = new ConnectionFactory();
2 factory.setHost("localhost");
3 try (Connection connection = factory.newConnection();
4 Channel channel = connection.createChannel()) {
5
6 }
複製代碼
這個鏈接即套接字鏈接,爲咱們處理協議版本協商和身份驗證等。這裏咱們鏈接一個本地的RabbitMq:所以是localhost,若是你想要鏈接一個遠程機器上的RabbitMq,只須要把localhst改爲那臺機器的計算機名或是IP地址。
建立完鏈接以後,咱們繼續建立一個信道:Channel。咱們須要使用try-with-resource表達式,由於Connection和Channel都實現了JAVA接口Closeable,屬於資源,須要關閉,這樣咱們就不須要顯示地在咱們的代碼中進行關閉了。(關於信道,請參考文章最頂部的RabbitMq原理圖,是TCP裏面的虛擬連接,例如:電纜至關於一個TCP,信道就是裏面的一個獨立光纖,一條TCP上面建立多條信道是沒有問題的;TCP一旦打開就分建立AMQP信道;不管是發佈消息、接收消息、訂閱隊列,這些動做都是經過信道完成的)。
爲了發送消息,咱們還必需要定義一個須要發送到的消息隊列,這些都要使用try-with-resource表達式:
1 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
2 String message = "Hello World!";
3 channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
4 System.out.println(" [x] Sent '" + message + "'");
複製代碼
定義一個消息隊列是冪等的:只有該隊列不存在的時候才能被建立,消息是二進制數組,所以你能夠根據須要指定編碼。
完成的Send.java以下:
1 import com.rabbitmq.client.Channel;
2 import com.rabbitmq.client.Connection;
3 import com.rabbitmq.client.ConnectionFactory;
4
5 public class Send {
6
7 private final static String QUEUE_NAME = "hello";
8
9 public static void main(String[] argv) throws Exception {
10 ConnectionFactory factory = new ConnectionFactory();
11 factory.setHost("localhost");
12 try (Connection connection = factory.newConnection();
13 Channel channel = connection.createChannel()) {
14 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
15 String message = "Hello World!";
16 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
17 System.out.println(" [x] Sent '" + message + "'");
18 }
19 }
20 }
複製代碼
消費者監聽RabbitMq中的消息,所以與生產者發送一條消息就退出不一樣,消費者要保持運行狀態來接收消息並打印出來。
Recv.java一樣須要導入如下依賴包:
1 import com.rabbitmq.client.Channel;
2 import com.rabbitmq.client.Connection;
3 import com.rabbitmq.client.ConnectionFactory;
4 import com.rabbitmq.client.DeliverCallback;
複製代碼
與生產者相同,咱們須要建立Connetcion和Channel、定義隊列(須要監聽並接收消息的隊列):
1 public class Recv {
2
3 private final static String QUEUE_NAME = "hello";
4
5 public static void main(String[] argv) throws Exception {
6 ConnectionFactory factory = new ConnectionFactory();
7 factory.setHost("localhost");
8 Connection connection = factory.newConnection();
9 Channel channel = connection.createChannel();
10
11 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
12 System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
13
14 }
15 }
複製代碼
注意咱們也在這裏聲明隊列,由於咱們可能在生產者以前啓動消費者,咱們想要確保在咱們嘗試消費消息的時候隊列就已經存在了。
這裏咱們爲何不使用try-with-resource表達式自動關閉channl和connection?經過這樣,咱們就可使咱們的程序一直保持運行狀態,若是把這些關了,程序也就中止了。這就尷尬了,由於咱們須要保持消費者一直處於異步監聽消息過來的狀態。
RabbitMq會將隊列中的消息異步地推送過來,咱們須要提供一個回調函數來緩存消息直到咱們須要用到這些消息:
1 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
2 String message = new String(delivery.getBody(), "UTF-8");
3 System.out.println(" [x] Received '" + message + "'");
4 };
5 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
複製代碼
Rec.java完整代碼:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
複製代碼
在官方手冊中,測試部分他們是將客戶端jar和依賴jar添加到classpath路徑,而後在cmd終端來運行的,我以爲麻煩,所以,我這裏放到IDEA中來運行,效果是同樣的。
第一步:首先運行Send.java:
輸出結果:
[x] Sent 'Hello World!'
複製代碼
查看RabbitMq控制檯:
說明消息已經發送成功。
第二步:啓動消費者Recv.java:
輸出結果:
[x] Received 'Hello World!'
複製代碼
說明消息已經消費成功了,此時再查看控制檯:
消息依然存在在隊列中,可是區別是,在第一張圖中Ready由1變成了0,Unacknowledged由0變成了1;第二張圖中Ready也由1變成0,Unacked由0變成了1。爲何會這樣?按道理,消息消費了以後就應該刪除掉,不然可能形成重複消費。關於這方面知識,將會在後面的章節中再介紹(Ack機制)。
上面雖然實現了功能,但在實際工做中,咱們更多的多是使用SpringBoot、SpringCloud等成熟的框架來實現。本小節就經過SpringBoot來實現以上功能。
建立工程的時候選擇RabbitMq:
工程目錄以下:
Provider和Consumer的配置文件相同,IP請替換成你本身的:
1 #RabbitMq
2 spring.rabbitmq.host=192.168.xx.xx
3 spring.rabbitmq.username=rabbitmq
4 spring.rabbitmq.password=123456
5
6 hello_world.queue=hello
複製代碼
爲方便讓系統啓動時就往隊列發送消息,因此寫了一個SenderRunner類:
1 @Component
2 public class SenderRunner implements ApplicationRunner {
3
4 @Autowired
5 private Send send;
6
7 @Override
8 public void run(ApplicationArguments args) throws Exception {
9 send.doSender("Hello RabbitMq");
10 }
11 }
複製代碼
Send.java
1 @Component
2 public class Send {
3
4 @Value("${hello_world.queue}")
5 private String queueName;
6
7 @Autowired
8 private AmqpTemplate amqpTemplate;
9
10 public void doSender(String msg) {
11
12 amqpTemplate.convertAndSend(queueName,msg);
13 System.out.println("發送消息:" + msg);
14 }
15 }
複製代碼
啓動類:
1 @SpringBootApplication
2 public class ProviderApplication {
3 public static void main(String[] args) {
4 SpringApplication.run(ProviderApplication.class, args);
5 }
6 }
複製代碼
Recv.java
@Component
public class Recv {
@RabbitListener(queues = "${hello_world.queue}")
public void receive(String msg) {
System.out.println("接收到消息:" + msg);
}
}
複製代碼
啓動Provider:
查看控制檯:
啓動Consumer:
可見,SpringBoot爲咱們作了不少封裝,隱藏了不少底層的細節,使用起來簡單多了。(未完待續......)