消息隊列rabitMq

rabbitmq

MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消息傳遞指的是程序之間經過在消息中發送數據進行通訊而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過 隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求html

使用場景

在項目中,將一些無需即時返回且耗時的操做提取出來,進行了異步處理,而這種異步處理的方式大大的節省了服務器的請求響應時間,從而提升了系統的吞吐量。java

含義

RabbitMQ是一個在AMQP基礎上完成的,可複用的企業消息系統。他遵循Mozilla Public License開源協議緩存

客戶端

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
  private final static String QUEUE_NAME = "hello";
  public static void main(String[] args) throws.IOException{
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  String message = "Hello World!";
  channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  System.out.println(" [x] Sent '" + message + "'");
  channel.close();
  connection.close();
  }
}

消費者端

public class RabbitMQRecv {
  public static void main(String avg[]) throws.IOException,java.lang.InterruptedException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection
= factory.newConnection();     Channel channel = connection.createChannel();     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.
out.println(" [*] Waiting for messages. To exit press CTRL+C");     QueueingConsumer consumer = new QueueingConsumer(channel);     channel.basicConsume(QUEUE_NAME, true, consumer);
    
while (true) {       QueueingConsumer.Delivery delivery = consumer.nextDelivery();       String message = new String(delivery.getBody());       System.out.println(" [x] Received '" + message + "'");     }   } }

幾個概念

Exchange:交換機,決定了消息路由規則;
Queue:消息隊列;
Channel:進行消息讀寫的通道;
Bind:綁定了Queue和Exchange,意即爲符合什麼樣路由規則的消息,將會放置入哪個 消息隊列
 

RabbitMQ的結構圖以下:

 

 

 

幾個概念說明:
Broker:簡單來講就是消息隊列服務器實體。
  Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列。
  Queue:消息隊列載體,每一個消息都會被投入到一個或多個隊列。
  Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來。
  Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
  vhost:虛擬主機,一個broker裏能夠開設多個vhost,用做不一樣用戶的權限分離。
  producer:消息生產者,就是投遞消息的程序。
  consumer:消息消費者,就是接受消息的程序。
  channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。
消息隊列的使用過程大概以下:
(1)客戶端鏈接到消息隊列服務器,打開一個channel。
  (2)客戶端聲明一個exchange,並設置相關屬性。
  (3)客戶端聲明一個queue,並設置相關屬性。
  (4)客戶端使用routing key,在exchange和queue之間創建好綁定關係。
  (5)客戶端投遞消息到exchange。
exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。
exchange也有幾個類型,徹底根據key進行投遞的叫 作Direct交換機,例如,綁定時設置了routing key爲」abc」,那麼客戶端提交的消息,只有設置了key爲」abc」的纔會投遞到隊列。對key進行模式匹配後進行投遞的叫作 Topic交換機,符號」#」匹配一個或多個詞,符號」*」匹配正好一個詞。例如」abc.#」匹配」abc.def.ghi」,」abc.*」只匹配」abc.def」。還有一種不須要key的,叫作 Fanout交換機,它採起廣播模式,一個消息進來時,投遞到與該交換機綁定的全部隊列。
RabbitMQ支持消息的持久化,也就是數據寫在磁盤上,爲了數據安全考慮,我想大多數用戶都會選擇持久化。消息隊列持久化包括3個部分:
  (1)exchange持久化,在聲明時指定durable => 1
  (2)queue持久化,在聲明時指定durable => 1
  (3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)
若是exchange和queue都是持久化的,那麼它們之間的binding也是持久化的。若是exchange和queue二者之間有一個持久化,一個非持久化,就不容許創建綁定。
 
 

什麼是MQ?安全

       MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。MQ是消費-生產者模型的一個典型的表明,一端往消息隊列中不斷寫入消息,而另外一端則能夠讀取隊列中的消息。服務器

      RabbitMQ是MQ的一種。下面詳細介紹一下RabbitMQ的基本概念。app

一、隊列、生產者、消費者負載均衡

      隊列是RabbitMQ的內部對象,用於存儲消息。生產者(下圖中的P)生產消息並投遞到隊列中,消費者(下圖中的C)能夠從隊列中獲取消息並消費。異步

     

      多個消費者能夠訂閱同一個隊列,這時隊列中的消息會被平均分攤給多個消費者進行處理,而不是每一個消費者都收到全部的消息並處理。ide

     

二、Exchange、Binding函數

      剛纔咱們看到生產者將消息投遞到隊列中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的狀況是,生產者將消息發送到Exchange(交換器,下圖中的X),再經過Binding將Exchange與Queue關聯起來。

     

三、Exchange Type、Bingding key、routing key

      在綁定(Binding)Exchange與Queue的同時,通常會指定一個binding key。在綁定多個Queue到同一個Exchange的時候,這些Binding容許使用相同的binding key。

      生產者在將消息發送給Exchange的時候,通常會指定一個routing key,來指定這個消息的路由規則,生產者就能夠在發送消息給Exchange時,經過指定routing key來決定消息流向哪裏。

      RabbitMQ經常使用的Exchange Type有三種:fanout、direct、topic。

      fanout:把全部發送到該Exchange的消息投遞到全部與它綁定的隊列中。

      direct:把消息投遞到那些binding key與routing key徹底匹配的隊列中。

      topic:將消息路由到binding key與routing key模式匹配的隊列中。

      附上一張RabbitMQ的結構圖:

     

    

最後來具體解析一下幾個問題:

一、能夠自動建立隊列,也能夠手動建立隊列,若是自動建立隊列,那麼是誰負責建立隊列呢?是生產者?仍是消費者? 

      若是隊列不存在,固然消費者不會收到任何的消息。可是若是隊列不存在,那麼生產者發送的消息就會丟失。因此,爲了數據不丟失,消費者和生產者均可以建立隊列。那麼若是建立一個已經存在的隊列呢?那麼不會有任何的影響。須要注意的是沒有任何的影響,也就是說第二次建立若是參數和第一次不同,那麼該操做雖然成功,可是隊列屬性並不會改變。

      隊列對於負載均衡的處理是完美的。對於多個消費者來講,RabbitMQ使用輪詢的方式均衡的發送給不一樣的消費者。

二、RabbitMQ的消息確認機制

      默認狀況下,若是消息已經被某個消費者正確的接收到了,那麼該消息就會被從隊列中移除。固然也可讓同一個消息發送到不少的消費者。

      若是一個隊列沒有消費者,那麼,若是這個隊列有數據到達,那麼這個數據會被緩存,不會被丟棄。當有消費者時,這個數據會被當即發送到這個消費者,這個數據被消費者正確收到時,這個數據就被從隊列中刪除。

     那麼什麼是正確收到呢?經過ack。每一個消息都要被acknowledged(確認,ack)。咱們能夠顯示的在程序中去ack,也能夠自動的ack。若是有數據沒有被ack,那麼:

     RabbitMQ Server會把這個信息發送到下一個消費者。

     若是這個app有bug,忘記了ack,那麼RabbitMQServer不會再發送數據給它,由於Server認爲這個消費者處理能力有限。

    並且ack的機制能夠起到限流的做用(Benefitto throttling):在消費者處理完成數據後發送ack,甚至在額外的延時後發送ack,將有效的均衡消費者的負載。

 

 二:代碼示例

2.1:首先引入rabbitMQ jar包

 <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
 </dependency>

2.2:建立消費者Producer

/**
 * 消息生成者
 */
public class Producer {
    public final static String QUEUE_NAME="rabbitMQ.test";

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置RabbitMQ相關信息
        factory.setHost("localhost");
      //factory.setUsername("lp");
      //factory.setPassword("");
     // factory.setPort(2088);
        //建立一個新的鏈接
        Connection connection = factory.newConnection();
        //建立一個通道
        Channel channel = connection.createChannel();
        //  聲明一個隊列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello RabbitMQ";
        //發送消息到隊列中
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println("Producer Send +'" + message + "'");
        //關閉通道和鏈接
        channel.close();
        connection.close();
    }
}

注1:queueDeclare第一個參數表示隊列名稱、第二個參數爲是否持久化(true表示是,隊列將在服務器重啓時生存)、第三個參數爲是不是獨佔隊列(建立者可使用的私有隊列,斷開後自動刪除)、第四個參數爲當全部消費者客戶端鏈接斷開時是否自動刪除隊列、第五個參數爲隊列的其餘參數

注2:basicPublish第一個參數爲交換機名稱、第二個參數爲隊列映射的路由key、第三個參數爲消息的其餘屬性、第四個參數爲發送信息的主體

2.3:建立消費者

 

複製代碼
public class Customer {
    private final static String QUEUE_NAME = "rabbitMQ.test";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置RabbitMQ地址
        factory.setHost("localhost");
        //建立一個新的鏈接
        Connection connection = factory.newConnection();
        //建立一個通道
        Channel channel = connection.createChannel();
        //聲明要關注的隊列
        channel.queueDeclare(QUEUE_NAME, false, false, true, null);
        System.out.println("Customer Waiting Received messages");
        //DefaultConsumer類實現了Consumer接口,經過傳入一個頻道,
        // 告訴服務器咱們須要那個頻道的消息,若是頻道中有消息,就會執行回調函數handleDelivery
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Customer Received '" + message + "'");
            }
        };
        //自動回覆隊列應答 -- RabbitMQ中的消息確認機制
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
複製代碼

前面代碼咱們能夠看出和生成者同樣的,後面的是獲取生產者發送的信息,其中envelope主要存放生產者相關信息(好比交換機、路由key等)body是消息實體。

2.4:運行結果

生產者:

 

消費者:

 三:實現任務分發

工做隊列

一個隊列的優勢就是很容易處理並行化的工做能力,可是若是咱們積累了大量的工做,咱們就須要更多的工做者來處理,這裏就要採用分佈機制了。

咱們新建立一個生產者NewTask

複製代碼
public class NewTask {
    private static final String TASK_QUEUE_NAME="task_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel();
   channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        //分發信息
        for (int i=0;i<10;i++){
            String message="Hello RabbitMQ"+i;
            channel.basicPublish("",TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
            System.out.println("NewTask send '"+message+"'");
        }
        channel.close();
        connection.close();
    }
}
複製代碼

而後建立2個工做者Work1和Work2代碼同樣

複製代碼
public class Work1 {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println("Worker1  Waiting for messages");

        //每次從隊列獲取的數量
        channel.basicQos(1);

        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Worker1  Received '" + message + "'");
                try {
                    throw  new Exception();
                    //doWork(message);
                }catch (Exception e){
                    channel.abort();
                }finally {
                    System.out.println("Worker1 Done");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck=false;
        //消息消費完成確認
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
    }
    private static void doWork(String task) {
        try {
            Thread.sleep(1000); // 暫停1秒鐘
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}
複製代碼

注:channel.basicQos(1);保證一次只分發一個 。autoAck是否自動回覆,若是爲true的話,每次生產者只要發送信息就會從內存中刪除,那麼若是消費者程序異常退出,那麼就沒法獲取數據,咱們固然是不但願出現這樣的狀況,因此纔去手動回覆,每當消費者收到並處理信息而後在通知生成者。最後從隊列中刪除這條信息。若是消費者異常退出,若是還有其餘消費者,那麼就會把隊列中的消息發送給其餘消費者,若是沒有,等消費者啓動時候再次發送。

 

 

參考:http://www.javashuo.com/article/p-vqbsgsxl-dt.html

相關文章
相關標籤/搜索