RabbitMQ指南之一:"Hello World!"

  爲何要使用MQ消息中間件?它解決了什麼問題?關於爲何要使用消息中間件?消息中間件是如何作到同步變異步、流量削鋒、應用解耦的?網上已經有不少說明,我這裏就再也不說明。我在接下來的RabbitMq系列博客裏會將官方的講解翻譯過來,同時加以本身的理解整理成博客,但願能和你們共同交流,一塊兒進步。java

  

                  RabbitMq原理圖spring

一、RabbitMq簡介

  RabbitMq是一個消息中間件:它接收消息、轉發消息。你能夠把它理解爲一個郵局:當你向郵箱裏寄出一封信後,郵遞員們就能最終將信送到收信人手中。相似的,RabbitMq就比如是一個郵箱、郵局和郵遞員。RabbitMq和郵局最大的區別是:RabbitMq接收、轉發的都是二進制數據塊--消息,而不是紙質的數據文件。api

  RabbitMq、消息相關術語以下:數組

  生產者:生產者只發送消息,發送消息的程序即爲生產者:緩存

  

  消息隊列:消息隊列就至關於RabbitMq中的郵箱名稱。儘管消息在你的程序和RabbitMq中流動,但它只能存儲在消息隊列中。隊列本質上是一個大的消息緩存,它能存多少消息,取決於主機的內存和磁盤限制。多個生產者能夠往同一個消息隊列中發送消息;多個消費者能夠從同一個隊列中獲取數據。咱們如下列圖形來表示一個消息隊列:bash

  

  消費者:消費者是一個等待接收消息的程序:服務器

  

  注意:生產者、消費者和RabbitMq能夠在不一樣的機器上;在不少的應用中,一個生產者同時也多是消費者。微信

二、「Hello World!」

  在這小節裏,咱們將寫一個消息生產者用來發送消息、一個消息消費者來消費消息(接收消息並打印出來)。框架

  在下面圖形中,「P」是咱們的生產者,「C」是咱們的消費者,中間的紅框是咱們的消息隊列,保存了從生產者那裏接收到的準備轉發到消費方的消息。異步

  

  Java客戶端類庫說明:

  RabbitMq使用多種協議,本指南使用AMQP 0-9-1協議,該協議是一個開源的、通用的消息協議。RabbitMq有多種語言的客戶端,這裏咱們使用JAVA語言的客戶端作實驗。經過如下地址下載RabbitMq客戶端jar包和依賴包:

  amqp-client-5.5.1.jar

  slf4j-api-1.7.25.jar

  slf4j-simple-1.7.25.jar

  把這三個jar包拷貝到你的工做目錄,包括後面教程要新建的java文件。

2.1 發送消息

  生產者鏈接RabbitMq,發送一條簡單的消息」Hello World!「後就退出。

  在Send.java類中,須要引入如下依賴包:

1 import com.rabbitmq.client.ConnectionFactory;
2 import com.rabbitmq.client.Connection;
3 import com.rabbitmq.client.Channel;
複製代碼

  給隊列起個名字:

1 public class Send {
2   private final static String QUEUE_NAME = "hello";
3   public static void main(String[] argv) throws Exception {
4       ...
5   }
6 }
複製代碼

  建立鏈接到服務器的鏈接Collection:

1 onnectionFactory factory = new ConnectionFactory();
2 factory.setHost("localhost");
3 try (Connection connection = factory.newConnection();
4      Channel channel = connection.createChannel()) {
5 
6 }
複製代碼

  這個鏈接即套接字鏈接,爲咱們處理協議版本協商和身份驗證等。這裏咱們鏈接一個本地的RabbitMq:所以是localhost,若是你想要鏈接一個遠程機器上的RabbitMq,只須要把localhst改爲那臺機器的計算機名或是IP地址。

  建立完鏈接以後,咱們繼續建立一個信道:Channel。咱們須要使用try-with-resource表達式,由於Connection和Channel都實現了JAVA接口Closeable,屬於資源,須要關閉,這樣咱們就不須要顯示地在咱們的代碼中進行關閉了。(關於信道,請參考文章最頂部的RabbitMq原理圖,是TCP裏面的虛擬連接,例如:電纜至關於一個TCP,信道就是裏面的一個獨立光纖,一條TCP上面建立多條信道是沒有問題的;TCP一旦打開就分建立AMQP信道;不管是發佈消息、接收消息、訂閱隊列,這些動做都是經過信道完成的)。

  爲了發送消息,咱們還必需要定義一個須要發送到的消息隊列,這些都要使用try-with-resource表達式:

1 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
2 String message = "Hello World!";
3 channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
4 System.out.println(" [x] Sent '" + message + "'");
複製代碼

  定義一個消息隊列是冪等的:只有該隊列不存在的時候才能被建立,消息是二進制數組,所以你能夠根據須要指定編碼。

  完成的Send.java以下:

1 import com.rabbitmq.client.Channel;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.ConnectionFactory;
 4 
 5 public class Send {
 6 
 7     private final static String QUEUE_NAME = "hello";
 8 
 9     public static void main(String[] argv) throws Exception {
10         ConnectionFactory factory = new ConnectionFactory();
11         factory.setHost("localhost");
12         try (Connection connection = factory.newConnection();
13              Channel channel = connection.createChannel()) {
14             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
15             String message = "Hello World!";
16             channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
17             System.out.println(" [x] Sent '" + message + "'");
18         }
19     }
20 }
複製代碼

2.2 接收消息

  消費者監聽RabbitMq中的消息,所以與生產者發送一條消息就退出不一樣,消費者要保持運行狀態來接收消息並打印出來。

  Recv.java一樣須要導入如下依賴包:

1 import com.rabbitmq.client.Channel;
2 import com.rabbitmq.client.Connection;
3 import com.rabbitmq.client.ConnectionFactory;
4 import com.rabbitmq.client.DeliverCallback;
複製代碼

  與生產者相同,咱們須要建立Connetcion和Channel、定義隊列(須要監聽並接收消息的隊列):

1 public class Recv {
 2 
 3   private final static String QUEUE_NAME = "hello";
 4 
 5   public static void main(String[] argv) throws Exception {
 6     ConnectionFactory factory = new ConnectionFactory();
 7     factory.setHost("localhost");
 8     Connection connection = factory.newConnection();
 9     Channel channel = connection.createChannel();
10 
11     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
12     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
13 
14   }
15 }
複製代碼

  注意咱們也在這裏聲明隊列,由於咱們可能在生產者以前啓動消費者,咱們想要確保在咱們嘗試消費消息的時候隊列就已經存在了。

  這裏咱們爲何不使用try-with-resource表達式自動關閉channl和connection?經過這樣,咱們就可使咱們的程序一直保持運行狀態,若是把這些關了,程序也就中止了。這就尷尬了,由於咱們須要保持消費者一直處於異步監聽消息過來的狀態。

  RabbitMq會將隊列中的消息異步地推送過來,咱們須要提供一個回調函數來緩存消息直到咱們須要用到這些消息:

1 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
2     String message = new String(delivery.getBody(), "UTF-8");
3     System.out.println(" [x] Received '" + message + "'");
4 };
5 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
複製代碼

  Rec.java完整代碼:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}
複製代碼

三、測試

  在官方手冊中,測試部分他們是將客戶端jar和依賴jar添加到classpath路徑,而後在cmd終端來運行的,我以爲麻煩,所以,我這裏放到IDEA中來運行,效果是同樣的。

  

  第一步:首先運行Send.java:

  輸出結果:

[x] Sent 'Hello World!'
複製代碼

  查看RabbitMq控制檯:

  

  說明消息已經發送成功。

   第二步:啓動消費者Recv.java:

  輸出結果:

[x] Received 'Hello World!'
複製代碼

  說明消息已經消費成功了,此時再查看控制檯:

  消息依然存在在隊列中,可是區別是,在第一張圖中Ready由1變成了0,Unacknowledged由0變成了1;第二張圖中Ready也由1變成0,Unacked由0變成了1。爲何會這樣?按道理,消息消費了以後就應該刪除掉,不然可能形成重複消費。關於這方面知識,將會在後面的章節中再介紹(Ack機制)。

四、用SpringBoot實現

  上面雖然實現了功能,但在實際工做中,咱們更多的多是使用SpringBoot、SpringCloud等成熟的框架來實現。本小節就經過SpringBoot來實現以上功能。

  建立工程的時候選擇RabbitMq:

  工程目錄以下:

  Provider和Consumer的配置文件相同,IP請替換成你本身的:

1 #RabbitMq
2 spring.rabbitmq.host=192.168.xx.xx  
3 spring.rabbitmq.username=rabbitmq
4 spring.rabbitmq.password=123456
5 
6 hello_world.queue=hello
複製代碼

  爲方便讓系統啓動時就往隊列發送消息,因此寫了一個SenderRunner類:

1 @Component
 2 public class SenderRunner implements ApplicationRunner {
 3 
 4     @Autowired
 5     private Send send;
 6 
 7     @Override
 8     public void run(ApplicationArguments args) throws Exception {
 9         send.doSender("Hello RabbitMq");
10     }
11 }
複製代碼

  Send.java

1 @Component
 2 public class Send {
 3 
 4     @Value("${hello_world.queue}")
 5     private String queueName;
 6 
 7     @Autowired
 8     private AmqpTemplate amqpTemplate;
 9 
10     public void doSender(String msg) {
11 
12         amqpTemplate.convertAndSend(queueName,msg);
13         System.out.println("發送消息:" + msg);
14     }
15 }
複製代碼

  啓動類:

1 @SpringBootApplication
2 public class ProviderApplication {
3     public static void main(String[] args) {
4         SpringApplication.run(ProviderApplication.class, args);
5     }
6 }
複製代碼

  Recv.java

@Component
public class Recv {

    @RabbitListener(queues = "${hello_world.queue}")
    public void receive(String msg) {
        System.out.println("接收到消息:" + msg);
    }
}
複製代碼

  啓動Provider:

  查看控制檯:

  啓動Consumer:

  可見,SpringBoot爲咱們作了不少封裝,隱藏了不少底層的細節,使用起來簡單多了。(未完待續......)      

歡迎你們關注個人我的微信公衆號,裏面不只有技術分享,還有各類行業趣事,讓您的生活變的豐富多彩

相關文章
相關標籤/搜索