RabbitMQ基礎入門

 

RabbitMQ是一個消息中間件,在一些須要異步處理、發佈/訂閱等場景的時候,使用RabbitMQ能夠完成咱們的需求。 下面是我在學習java語言實現RabbitMQ(自RabbitMQ官網的Tutorials)的一些記錄。html

首先有三個名稱瞭解一下(如下圖片來自rabbitMQ官網)java

  • producer是用戶應用負責發送消息

  • queue是存儲消息的緩衝(buffer)

  • consumer是用戶應用負責接收消息

下面是我使用rabbitMQ原生的jar包作的測試方法數組

maven pom.xml 加入服務器

<dependency>異步

    <groupId>com.rabbitmq</groupId>maven

    <artifactId>amqp-client</artifactId>ide

    <version>3.5.6</version>學習

</dependency>測試

方法實現示意圖fetch

 

發送消息方法(Send.java)

 1 import com.rabbitmq.client.Channel;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.ConnectionFactory;
 4 
 5 public class Send {
 6      
 7     private static final String QUEUE_NAME = "hello";
 8  
 9     public static void main(String[] args) throws Exception {
10         ConnectionFactory factory = new ConnectionFactory();
11         factory.setHost("192.168.1.7");
12         factory.setPort(5672);
13         factory.setUsername("admin");
14         factory.setPassword("admin");
15         Connection connection = factory.newConnection();
16         Channel channel = connection.createChannel();
17  
18         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
19         String message = "Hello World!";
20         // "" 表示默認exchange
21         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
22         System.out.println(" [x] Sent '" + message + "'");
23  
24         channel.close();
25         connection.close();
26     }
27  
28 }

10~16行 是獲取rabbitmq.client.Channel, rabbitMQ的API操做基本都是經過channel來完成的。

18行 channel.queueDeclare(QUEUE_NAME, false, false, false, null),這裏channel聲明瞭一個名字叫「hello」的queue,聲明queue的操做是冪等的,也就是說只有不存在相同名稱的queue的狀況下才會建立一個新的queue。

21行 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()),chaneel在這個queue裏發佈了消息(字節數組)。

24~25行 則是連接的關閉,注意關閉順序就行了。

接受消息方法 (Recv.java)

 1 import com.rabbitmq.client.AMQP;
 2 import com.rabbitmq.client.Channel;
 3 import com.rabbitmq.client.Connection;
 4 import com.rabbitmq.client.ConnectionFactory;
 5 import com.rabbitmq.client.Consumer;
 6 import com.rabbitmq.client.DefaultConsumer;
 7 import com.rabbitmq.client.Envelope;
 8 
 9 import java.io.IOException;
10 
11 public class Recv {
12 
13   private final static String QUEUE_NAME = "hello";
14 
15   public static void main(String[] argv) throws Exception {
16     ConnectionFactory factory = new ConnectionFactory();
17     factory.setHost("192.168.1.7");
18     factory.setPort(5672);
19     factory.setUsername("admin");
20     factory.setPassword("admin");
21     Connection connection = factory.newConnection();
22     Channel channel = connection.createChannel();
23 
24     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
25     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
26 
27     Consumer consumer = new DefaultConsumer(channel) {
28       @Override
29       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
30           throws IOException {
31         String message = new String(body, "UTF-8");
32         System.out.println(" [x] Received '" + message + "'");
33       }
34     };
35     channel.basicConsume(QUEUE_NAME, true, consumer);
36   }
37 }

16~22行 和Send類中同樣,也是獲取同一個rabbitMQ服務的channel,這也是能接受到消息的基礎。

24行 一樣聲明瞭一個和Send類中發佈的queue相同的queue。

27~35行 DefaultConsumer類實現了Consumer接口,因爲推送消息是異步的,所以在這裏提供了一個callback來緩衝接受到的消息。

先運行Recv 而後再運行Send,就能夠看到消息被接受輸出到控制檯了,若是多啓動幾個Recv,會發現消息被每一個消費者按順序分別消費了,

這也就是rabbitMQ默認採用Round-robin dispatching(輪詢分發機制)。

 

Work queues

上面簡單的實現了rabbitMQ消息的發送和接受,可是不管Send類中的queueDeclare 、basicPublish方法還有Recv類中的basicConsume方法都有不少的參數,

下面咱們分析一下幾個重要的參數。

(一)Message acknowledgment 消息答覆

上面Recv.java的第35行中,channel.basicConsume(QUEUE_NAME, true, consumer),

在Channel接口中定義爲 String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

這個autoAck咱們當前實現爲true,表示服務器會自動確認ack,一旦RabbitMQ將一個消息傳遞到consumer,它立刻會被標記爲刪除狀態。

這樣若是consumer在正常執行任務過程當中,一旦consumer服務掛了,那麼咱們就永遠的失去了這個consumer正在處理的全部消息。

爲了防止這種狀況,rabbitMQ支持Message acknowledgment,當消息被一個consumer接受並處理完成後,consumer發送給rabbitMQ一個回執,而後rabbitMQ纔會刪除這個消息。

當一個消息掛了,rabbitMQ會給另外可用的consumer繼續發送上個consumer由於掛了而沒有處理成功的消息。

所以咱們能夠設置autoAck=false,來顯示的讓服務端作消息成功執行的確認。

(二)Message durability 消息持久化

Message acknowledgment 確保了consumer掛了的狀況下,消息還能夠被其餘consumer接受處理,可是若是rabbitMQ掛了呢?

在聲明隊列的方法中,Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;

durable=true 意味着該隊列將在服務器重啓後繼續存在。Send和Recv兩個類中聲明隊列的方法都要設置durable=true。

如今,咱們須要將消息標記爲持久性——經過將MessageProperties(它實現BasicProperties)設置爲PERSISTENT_TEXT_PLAIN

(三)Fair dispatch 公平分發

rabbitMQ默認是輪詢分發,這樣對多個consumer而言,可能就會出現負載不均衡的問題,不管是任務自己難易度,仍是consumer處理能力的不一樣,都是致使這種問題。

爲了處理這種狀況咱們可使用basicQos方法來設置prefetchCount = 1。 這告訴rabbitMQ一次只給consumer一條消息,換句話來講,就是直到consumer發回ack,而後再向這個consumer發送下一條消息。

int prefetchCount = 1 ;
channel.basicQos(prefetchCount);

正是由於Fair dispatch是基於ack的,全部它最好和Message acknowledgment同時使用,不然在autoAck=true的狀況下,單獨設置Fair dispatch並無效果。

下面是本人測試以上三種狀況的測試代碼,能夠直接使用。

import java.util.Scanner;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {
     
    private static final String QUEUE_NAME = "task_queue";
 
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.1.7");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
 
        boolean durable = true;    //消息持久化
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
// 多個消息使用空格分隔 Scanner sc
= new Scanner(System.in); String[] splits = sc.nextLine().split(" "); for (int i = 0; i < splits.length; i++) { channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, splits[i].getBytes()); System.out.println(" [x] Sent '" + splits[i] + "'"); } channel.close(); connection.close(); } }
import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {

  private final static String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.1.7");
    factory.setPort(5672);
    factory.setUsername("admin");
    factory.setPassword("admin");
    Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
 
    // basicQos方法來設置prefetchCount = 1。 這告訴RabbitMQy一次只給worker一條消息,換句話來講,就是直到worker發回ack,而後再向這個worker發送下一條消息。
    channel.basicQos(1);
 
    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
 
        System.out.println(" [x] Received '" + message + "'");
        try {
          doWork(message);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    // 當consumer確認收到某個消息,而且已經處理完成,RabbitMQ能夠刪除它時,consumer會向RabbitMQ發送一個ack(nowledgement)。
    boolean autoAck = true;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
  }

    protected static void doWork(String message) throws InterruptedException {
        for (char ch: message.toCharArray()) {
            if (ch == '.') Thread.sleep(1000);
        }
    }
}

 

發佈/訂閱(Publish/Subscribe)

一個完整的rabbitMQ消息模型是會有Exchange的。

rabbitMQ的消息模型的核心思想是producer永遠不會直接發送任何消息到queue中,實際上,在不少狀況下producer根本不知道一條消息是否被髮送到了哪一個queue中。

在rabbitMQ中,producer僅僅將消息發送到一個exchange中。要理解exchange也很是簡單,它一邊負責接收producer發送的消息, 另外一邊將消息推送到queue中。

exchange必須清楚的知道在收到消息以後該如何進行下一步的處理,好比是否應該將這條消息發送到某個queue中? 仍是應該發送到多個queue中?仍是應該直接丟棄這條消息等。

exchange模型以下:

exchange類型也有好幾種:directtopicheaders以及fanout。

Fanout exchange

下面咱們來建立一個fanout類型的exchange,顧名思義,fanout會向全部的queue廣播全部收到的消息。

 1 import java.io.IOException;
 2 import java.util.Scanner;
 3 import java.util.concurrent.TimeoutException;
 4 
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.Connection;
 7 import com.rabbitmq.client.ConnectionFactory;
 8 
 9 import rabbitMQ.RabbitMQTestUtil;
10 
11 public class EmitLog {
12      
13     private static final String EXCHANGE_NAME = "logs";
14  
15     public static void main(String[] argv) throws IOException, TimeoutException {
16  
17         ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
18         Connection connection = factory.newConnection();
19         Channel channel = connection.createChannel();
20         
21         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
22  
23         // 多個消息使用空格分隔
24         Scanner sc = new Scanner(System.in);
25         String[] splits = sc.nextLine().split(" ");
26         for (int i = 0; i < splits.length; i++) {
27                  channel.basicPublish(EXCHANGE_NAME, "", null, splits[i].getBytes());
28              System.out.println(" [x] Sent '" + splits[i] + "'");
29         }
30  
31         channel.close();
32         connection.close();
33     }
34 }

 

 1 import java.io.IOException;
 2 
 3 import com.rabbitmq.client.AMQP;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 import com.rabbitmq.client.ConnectionFactory;
 7 import com.rabbitmq.client.Consumer;
 8 import com.rabbitmq.client.DefaultConsumer;
 9 import com.rabbitmq.client.Envelope;
10 
11 import rabbitMQ.RabbitMQTestUtil;
12 
13 public class ReceiveLogs {
14 
15     private static final String EXCHANGE_NAME = "logs";
16      
17       public static void main(String[] argv) throws Exception {
18         ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
19           Connection connection = factory.newConnection();
20           Channel channel = connection.createChannel();
21      
22         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
23         String queueName = channel.queueDeclare().getQueue();
24         channel.queueBind(queueName, EXCHANGE_NAME, "");
25      
26         Consumer consumer = new DefaultConsumer(channel) {
27           @Override
28           public void handleDelivery(String consumerTag, Envelope envelope,
29                                      AMQP.BasicProperties properties, byte[] body) throws IOException {
30             String message = new String(body, "UTF-8");
31             System.out.println(" [x] Received '" + message + "'");
32           }
33         };
34         channel.basicConsume(queueName, true, consumer);
35       }
36 }

Direct exchange

在fanout的exchange類型中,消息的發佈已經隊列的綁定方法中,routingKey參數都是默認空值,由於fanout類型會直接忽略這個值,

可是在其餘exchange類型中它擁有很重要的意義,

      

rabbitMQ支持以上兩種綁定,消息在發佈的時候,會指定一個routing key,而圖一中exchange會把routing key爲orange發送的消息將會被路由到queue Q1中,使用routing key爲black或者green的將會被路由到Q2中。

將多個queue使用相同的binding key進行綁定也是可行的。能夠在X和Q1中間增長一個routing key black。 它會向全部匹配的queue進行廣播,使用routing key爲black發送的消息將會同時被Q1Q2接收。

 下面是我測試debug和error兩種routing key發佈消息並接受處理消息的代碼:

import java.util.Scanner;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import rabbitMQ.RabbitMQTestUtil;

public class EmitLog {
     
    private static final String EXCHANGE_NAME = "direct_logs";
 
    public static void main(String[] argv)
                  throws java.io.IOException, TimeoutException {
 
            ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
            Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
 
        // 多個消息使用空格分隔
        Scanner sc = new Scanner(System.in);
        String[] splits = sc.nextLine().split(" ");
        for (int i = 0; i < splits.length; i++) {
                 channel.basicPublish(EXCHANGE_NAME, splits[i], null, splits[i].getBytes());
             System.out.println(" [x] Sent '" + splits[i] + "'");
        }
 
        channel.close();
        connection.close();
    }
}
View Code
import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import rabbitMQ.RabbitMQTestUtil;

public class ReceiveLogsDebug {

    private static final String EXCHANGE_NAME = "direct_logs";
     
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
          Connection connection = factory.newConnection();
          Channel channel = connection.createChannel();
     
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "debug");
     
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
     
        Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
          }
        };
        channel.basicConsume(queueName, true, consumer);
      }
}
View Code
import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import rabbitMQ.RabbitMQTestUtil;

public class ReceiveLogsError {

    private static final String EXCHANGE_NAME = "direct_logs";
     
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
          Connection connection = factory.newConnection();
          Channel channel = connection.createChannel();
     
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "error");
     
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
     
        Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
          }
        };
        channel.basicConsume(queueName, true, consumer);
      }
}
View Code

發送輸入:

debug接受:

error接受:

 

Topic exchange

發送到topic exchange中的消息不能有一個任意的routing_key——它必須是一個使用點分隔的單詞列表。單詞能夠是任意的。一些有效的routing key例子:」stock.usd.nyse」,」nyse.vmw」,」quick.orange.rabbit」。

routing key的長度限制爲255個字節數。

binding key也必須是相同的形式。topic exchange背後的邏輯相似於direct——一條使用特定的routing key發送的消息將會被傳遞至全部使用與該routing key相同的binding key進行綁定的隊列中。 然而,對binding key來講有兩種特殊的狀況:

  1. *(star)能夠代替任意一個單詞
  2. #(hash)能夠代替0個或多個單詞

和Direct exchange差很少,代碼就不copy了,有興趣的直接看看教程http://www.rabbitmq.com/tutorials/tutorial-five-java.html

相關文章
相關標籤/搜索