RabbitMQ學習整理

一、什麼是消息隊列?

概念:html

消息隊列(Message Queue,簡稱MQ),本質是個隊列,FIFO先入先出,只不過隊列中存放的內容是一些Messagejava

二、爲何要用消息隊列,應用場景?

不一樣系統、進程或者線程直接進行通訊。

 

系統解耦,將要作的部分放入隊列,便於模塊分離。

傳統模式spring

 

使用隊列解耦json

 

 

緩衝功能,當有大量請求要處理時,能夠先入隊而後依次處理,保證系統可靠性。

例如:淘寶秒殺活動等。服務器

 

 

異步操做,有時候爲了快速響應,可使用隊列來實現異步,好比郵箱驗證,手機驗證碼發送等。

 

三、消息隊列的幾種模式。

3-1、簡單模式(單生產者單消費者)

            

一個線程負責生產,一個線程負責總隊列裏取出來消費。app

3-2、單生產者多消費者

 

一個線程生產,多個線程取出來消費。異步

3-3、訂閱/發佈模式

 

 一個發佈者發送消息,多個訂閱者能夠同時獲取到發佈的消息ide

3-4、路由模式

 

3-5、主題模式,按規則模糊匹配

 

3-5、廣播模式

生產者發送的消息會發往每一個與其綁定的隊列。學習

 

4RabbitMQ的幾個組成部分

AMQP協議: Advanced Message Queuing Protocol 高級消息隊列協議,是一個異步消息傳遞所使用的應用層協議規範。fetch

 

組成:

         服務主機:接收客戶端請求,並做出相應處理

         虛擬主機:一個服務器能夠開啓多個Virtual Host,每一個虛擬主機都能提供完整的服務,有本身的權限控制。

         生產者:發送消息

         消費者:接收消息並處理

         交換器:RabbitMQ中,消息不是直接發往隊列,而是要先給交換器,而後交換器按照必定的路由規則發送到相對應的隊列上。

         路由Key:發送消息時要指定交換器和路由Key

         綁定:將隊列和交換器綁定起來路由Key做爲綁定時的關鍵字

         隊列:消息的載體,消費者從隊列中獲取消息,路由器根據路由規則把消息發往對應的隊列。

         消息:隊列中存儲的信息單元。

5、工做原理

生產者發送消息時指定交換器名和路由Key,而後交換器根據路由Key與綁定信息進行比對,找到對應的隊列後將信息發送出去。

消費者監聽某個隊列,若是有消息就取出來作對應操做,沒有就阻塞。


 

 

6、交換器的幾個工做模式

Direct:固定名稱匹配,只有路由Key與綁定的Key一致纔會將消息發送到該隊列。

Topic:主題模式,路由Key能夠用*#來填充

 

#能夠匹配任意多個單詞,*只能匹配一個單詞

好比bingdKey  x.y  x.y.z  a.y  a.b.z

x.*只能匹配x.y,而x.#能夠匹配x.y x.y.z

Fanout:廣播模式

7Demo

安裝RabbitMQ,並啓動服務。默認用戶名密碼guest。建立VirtualHost

7-1、單生產者單消費者

publicclass Recv {

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //創建到代理服務器到鏈接

       Connection conn = factory.newConnection();

       //得到信道

       final Channel channel = conn.createChannel();

       //聲明交換器

       String exchangeName = "hello-exchange";

       channel.exchangeDeclare(exchangeName, "direct", true);

       //聲明隊列

       String queueName = channel.queueDeclare().getQueue();

       String routingKey = "hola";

       //綁定隊列,經過鍵 hola將隊列和交換器綁定起來

       channel.queueBind(queueName, exchangeName, routingKey);

 

           //消費消息

           booleanautoAck = false;

           String consumerTag = "";

           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {

              @Override

              publicvoid handleDelivery(String consumerTag,

                     Envelope envelope,  

                     AMQP.BasicProperties properties,

                     byte[] body) throws IOException {

                  String routingKey = envelope.getRoutingKey();

                  String contentType = properties.getContentType();

                  System.out.println("消費的路由鍵:" + routingKey);

                  System.out.println("消費的內容類型:" + contentType);

                  longdeliveryTag = envelope.getDeliveryTag();

                  //確認消息

                  channel.basicAck(deliveryTag, false);

                  System.out.println("消費的消息體內容:");

                  String bodyStr = new String(body, "UTF-8");

                  System.out.println(bodyStr);

 

              }

           });

    }

}

publicclass Send {

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立鏈接工廠

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //設置 RabbitMQ 地址

        factory.setHost("localhost");

        //創建到代理服務器到鏈接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //聲明交換器

        String exchangeName = "hello-exchange";

        channel.exchangeDeclare(exchangeName, "direct", true);

 

        String routingKey = "hola";

        //發佈消息

        byte[] messageBodyBytes = "quit".getBytes();

        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

 

        channel.close();

        conn.close();

 

    }

}

 

7-2、單生產者多消費者

11的相似,只不過兩個消費者共同消費一個隊列內的信息,複製一份消費者便可。

7-3、訂閱/發佈模式

生產者發送的消息,發往每一個訂閱他的消費者那裏。全部消費者均可以獲取相同的信息。

發佈者A

publicclass Send {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立鏈接工廠

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //設置 RabbitMQ 地址

        factory.setHost("localhost");

        //創建到代理服務器到鏈接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //聲明交換器

        String exchangeName = "王力宏";

        channel.exchangeDeclare(exchangeName, "fanout", true);

 

        //發佈消息

        byte[] messageBodyBytes = "王力宏發佈的消息:啦啦啦啦啦".getBytes();

        channel.basicPublish(exchangeName, "", null, messageBodyBytes);

 

        channel.close();

        conn.close();

    }

}

發佈者B

publicclass Send2 {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立鏈接工廠

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //設置 RabbitMQ 地址

        factory.setHost("localhost");

        //創建到代理服務器到鏈接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //聲明交換器

        String exchangeName = "趙薇";

        channel.exchangeDeclare(exchangeName, "fanout", true);

 

        //發佈消息

        byte[] messageBodyBytes = "趙薇發佈的消息:啊啊啊啊".getBytes();

        channel.basicPublish(exchangeName, "", null, messageBodyBytes);

 

        channel.close();

        conn.close();

    }

}

訂閱者A1A2

publicclass Recv {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //創建到代理服務器到鏈接

       Connection conn = factory.newConnection();

       //得到信道

       final Channel channel = conn.createChannel();

       //聲明交換器

       String exchangeName = "王力宏";

       channel.exchangeDeclare(exchangeName, "fanout", true);

       //聲明隊列

       String queueName = channel.queueDeclare().getQueue();

       //綁定隊列,經過鍵 hola將隊列和交換器綁定起來

       channel.queueBind(queueName, exchangeName, "");

 

       while(true) {

           //消費消息

           booleanautoAck = false;

           String consumerTag = "";

           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {

              @Override

              publicvoid handleDelivery(String consumerTag,

                     Envelope envelope,  

                     AMQP.BasicProperties properties,

                     byte[] body) throws IOException {

                  String routingKey = envelope.getRoutingKey();

                  String contentType = properties.getContentType();

                  System.out.println("消費的路由鍵:" + routingKey);

                  System.out.println("消費的內容類型:" + contentType);

                  longdeliveryTag = envelope.getDeliveryTag();

                  //確認消息

                  channel.basicAck(deliveryTag, false);

                  System.out.println("消費的消息體內容:");

                  String bodyStr = new String(body, "UTF-8");

                  System.out.println(bodyStr);

               }

           });

       }

    }

}

訂閱者B1B2

publicclass Recv3 {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //創建到代理服務器到鏈接

       Connection conn = factory.newConnection();

       //得到信道

       final Channel channel = conn.createChannel();

       channel.exchangeDeclare("趙薇", "fanout", true);

       //聲明交換器

       String exchangeName = "趙薇";

       //聲明隊列

       String queueName = channel.queueDeclare().getQueue();

       //綁定隊列,經過鍵 hola將隊列和交換器綁定起來

       channel.queueBind(queueName, exchangeName, "");

 

       while(true) {

           //消費消息

           booleanautoAck = false;

           String consumerTag = "";

           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {

              @Override

              publicvoid handleDelivery(String consumerTag,

                     Envelope envelope,  

                     AMQP.BasicProperties properties,

                     byte[] body) throws IOException {

                  String routingKey = envelope.getRoutingKey();

                  String contentType = properties.getContentType();

                  System.out.println("消費的路由鍵:" + routingKey);

                  System.out.println("消費的內容類型:" + contentType);

                  longdeliveryTag = envelope.getDeliveryTag();

                  //確認消息

                  channel.basicAck(deliveryTag, false);

                  System.out.println("消費的消息體內容:");

                  String bodyStr = new String(body, "UTF-8");

                  System.out.println(bodyStr);

 

              }

           });

       }

    }

 

}

 

7-4、主題模式

生產者

publicclass Send {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立鏈接工廠

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //設置 RabbitMQ 地址

        factory.setHost("localhost");

        //創建到代理服務器到鏈接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //聲明交換器

        String exchangeName = "topic-exchange";

        channel.exchangeDeclare(exchangeName, "topic", true);

 

        //

        String routingKey = "#.B";

       

        //發佈消息

        for (inti = 0; i < 3; i++) {

            channel.basicPublish(exchangeName, routingKey, null, "ss".getBytes());

       }

        //byte[] messageBodyBytes = "匹配消息".getBytes();

        //channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

        channel.close();

        conn.close();

    }

}

消費者A

publicclass Recv {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //創建到代理服務器到鏈接

       Connection conn = factory.newConnection();

       //得到信道

       final Channel channel = conn.createChannel();

       //聲明交換器

       String exchangeName = "topic-exchange";

       channel.exchangeDeclare(exchangeName, "topic", true);

       //聲明隊列

       String queueName = channel.queueDeclare("X", false, false, false, null).getQueue();

      

       String routingKey = "X.A";

       //綁定隊列,經過鍵 hola將隊列和交換器綁定起來

       channel.queueBind(queueName, exchangeName, routingKey);

 

       while(true) {

           //消費消息

           booleanautoAck = false;

           String consumerTag = "";

           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {

              @Override

              publicvoid handleDelivery(String consumerTag,

                     Envelope envelope,  

                     AMQP.BasicProperties properties,

                     byte[] body) throws IOException {

                  String routingKey = envelope.getRoutingKey();

                  String contentType = properties.getContentType();

                  System.out.println("消費的路由鍵:" + routingKey);

                  System.out.println("消費的內容類型:" + contentType);

                  longdeliveryTag = envelope.getDeliveryTag();

                  //確認消息

                  channel.basicAck(deliveryTag, false);

                  System.out.println("消費的消息體內容:");

                  String bodyStr = new String(body);

                  System.out.println(bodyStr);

              }

           });

       }

    }

}

消費者BroutingKey routingKey.A

消費者CroutingKey  routingKey.B

7-5、廣播模式,不須要管routingKeybindingKey是否匹配。

生產者

publicclass Send {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立鏈接工廠

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //設置 RabbitMQ 地址

        factory.setHost("localhost");

        //創建到代理服務器到鏈接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //聲明交換器

        String exchangeName = "fanout-exchange";

        channel.exchangeDeclare(exchangeName, "fanout", true);

 

        //

        String routingKey = "hola";

       

        //發佈消息

        byte[] messageBodyBytes = "羣發消息".getBytes();

        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

 

        channel.close();

        conn.close();

    }

}

消費者

publicclass Recv {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //創建到代理服務器到鏈接

       Connection conn = factory.newConnection();

       //得到信道

       final Channel channel = conn.createChannel();

       //聲明交換器

       String exchangeName = "fanout-exchange";

       channel.exchangeDeclare(exchangeName, "fanout", true);

       //聲明隊列

       String queueName = channel.queueDeclare().getQueue();

      

       String routingKey = "hola2";

       //綁定隊列,經過鍵 hola將隊列和交換器綁定起來

       channel.queueBind(queueName, exchangeName, routingKey);

 

       while(true) {

           //消費消息

           booleanautoAck = false;

           String consumerTag = "";

           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {

              @Override

              publicvoid handleDelivery(String consumerTag,

                     Envelope envelope,  

                     AMQP.BasicProperties properties,

                     byte[] body) throws IOException {

                  String routingKey = envelope.getRoutingKey();

                  String contentType = properties.getContentType();

                  System.out.println("消費的路由鍵:" + routingKey);

                  System.out.println("消費的內容類型:" + contentType);

                  longdeliveryTag = envelope.getDeliveryTag();

                  //確認消息

                  channel.basicAck(deliveryTag, false);

                  System.out.println("消費的消息體內容:");

                  String bodyStr = new String(body);

                  System.out.println(bodyStr);

 

              }

           });

       }

    }

}

 

8RabbitMQSpring整合

8-1、添加依賴

<dependency>

  <groupId>com.rabbitmq</groupId>

  <artifactId>amqp-client</artifactId>

  <version>x.x.x</version>

</dependency>

<dependency>

    <groupId>org.springframework.amqp</groupId>

    <artifactId>spring-rabbit</artifactId>

    <version>x.x.xRELEASE</version>

</dependency>

8-2、配置文件中加入rabbit服務鏈接配置

mq.host=real_host

mq.username=guest

mq.password=guest

mq.port=5672

mq.vhost=real_vhost

8-3、新建application-mq.xml文件,添加配置信息

主要用來配置鏈接信息、Producer配置、隊列聲明、交換器聲明、隊列與交換器的綁定、隊列的監聽器配置(即消費者)等。

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"

xsi:schemaLocation="http://www.springframework.org/schema/rabbit

http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd

http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">

    <!-- 全局配置 -->

    <!-- 定義RabbitMQ的鏈接工廠 -->

    <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}"  virtual-host="${mq.vhost}"/>

    <!-- MQ的管理,包括隊列、交換器等 -->

    <rabbit:admin connection-factory="connectionFactory"/>

 

    <!-- Sender配置 -->

    <!-- spring template聲明-->

    <!-- 能夠不指定交換器,在每次發送請求時須要指明發給哪一個交換器 <rabbit:template id="amqpTemplate"  connection-factory="connectionFactory"/> -->

    <rabbit:template exchange="test" id="amqpTemplate"  connection-factory="connectionFactory"/><!-- message-converter="jsonMessageConverter" />    -->

    <!-- 消息對象json轉換類 -->

    <!-- <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> 

    -->

 

    <!--定義queue  說明:durable:是否持久化 exclusive: 僅建立者可使用的私有隊列,斷開後自動刪除 auto_delete: 當全部消費客戶端鏈接斷開後,是否自動刪除隊列-->

    <rabbit:queue name="mq.A" durable="true" auto-delete="false" exclusive="false" />

    <rabbit:queue name="mq.B" durable="true" auto-delete="false" exclusive="false" />

    <rabbit:queue name="mq.C" durable="true" auto-delete="false" exclusive="false" />

   

    <!-- 定義交換機,而且完成隊列和交換機的綁定 -->

    <rabbit:direct-exchange name="test" durable="true" auto-delete="false" id="test">

        <rabbit:bindings>

            <rabbit:binding queue="mq.A" key="key.A"/>

            <rabbit:binding queue="mq.B" key="key.B"/>

            <rabbit:binding queue="mq.C" key="key.C"/>

        </rabbit:bindings>

    </rabbit:direct-exchange>

   

    <!--

         queues:監聽的隊列,多個的話用逗號(,)分隔

        ref:監聽器

     -->

    <!-- 配置監聽  acknowledeg = "manual"   設置手動應答  當消息處理失敗時:會一直重發  直到消息處理成功  -->

    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">

    <!-- 配置監聽器 -->

        <rabbit:listener queues="mq.A" ref="listenerA"/>

        <rabbit:listener queues="mq.B" ref="listenerB"/>

        <rabbit:listener queues="mq.C" ref="listenerC"/>

    </rabbit:listener-container>

</beans>

 

生產者:Spring提供的AmqpTemplate,使用註解注入便可使用

@Autowired

private AmqpTemplate amqpTemplate;

publicvoid sendMsg(String msg) {

    amqpTemplate.convertAndSend("routingKey", "發送了消息A");

}

 

監聽器:即收到隊列的消息後做何處理,要實現ChannelAwareMessageListener

例如:監聽器listenerA

@Component

publicclass ListenerA implements ChannelAwareMessageListener {

 

      privatefinalstatic Log logger = LogFactory.getLog(ListenerA.class);

   

    @Override

    publicvoid onMessage(Message message, Channel channel) throws Exception {

       // TODO Auto-generated method stub

       String msg = new String(message.getBody());

       System.out.println("A received : " + msg);

       logger.error(msg);

    }

}

 

 

 

9、持久化機制

有可能遇到程序崩潰或者Rabbit服務器宕機的狀況,那麼若是沒有持久化機制,全部數據都會丟失。

交換器持久化

Durable:是否持久化參數設爲True便可

channel.exchangeDeclare(exchangeName, type, true);

隊列持久化channel.queueDeclare("A",true,false,false,null).getQueue();

消息持久化

在以前,消息分發給consumer後當即就會被標記爲已消費,這時候若是consumber接到了一個消息可是尚未來的及處理就異常退出,那麼這個消息的狀態是已被消費的,因而就會形成消息丟失的問題。

 

處理的代碼也很簡單,一共有兩個步驟。第一個把autoAck改爲false

//消費結果須要進行確認

 channel.BasicConsume("firstTest", false, consumer);

第二部分就是在咱們消費完成後進行確認

//進行交付,肯定此消息已經處理完成

channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);

若是沒有進行確認queue會把這個消息交給其它的consumer去處理,若是沒有交付的代碼,那麼這個消息會一直存在。

 

消息持久化步驟:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)

                    throws IOException;

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props,        byte[] body)

                    throws IOException;

exchange表示exchange的名稱
routingKey
表示routingKey的名稱
body
表明發送的消息體

MessageProperties.PERSISTENT_TEXT_PLAIN  能夠設置爲持久化,類型爲文本

MessageProperties.PERSISTENT_BASIC    類型爲二進制數據

mandatorymandatory標誌位設置爲true時,若是exchange沒法找到一個隊列取轉發,就返回給生產者。

immediateimmediate標誌位設置爲true時,若是exchange要轉發的隊列上沒有消費者時,就返回給生產者。

10、消息確認機制

概述

RabbitMQ可能會遇到的一個問題,即生成者不知道消息是否真正到達broker,那麼有沒有高效的解決方式呢?答案是採用Confirm模式。

producerconfirm模式的實現原理

生產者將信道設置成confirm模式,一旦信道進入confirm模式,全部在該信道上面發佈的消息都會被指派一個惟一的ID(1開始),一旦消息被投遞到全部匹配的隊列以後,broker就會發送一個確認給生產者(包含消息的惟一ID,這就使得生產者知道消息已經正確到達目的隊列了,若是消息和隊列是可持久化的,那麼確認消息會將消息寫入磁盤以後發出,broker回傳給生產者的確認消息中deliver-tag域包含了確認消息的序列號,此外broker也能夠設置basic.ackmultiple域,表示到這個序列號以前的全部消息都已經獲得了處理。

confirm模式最大的好處在於他是異步的,一旦發佈一條消息,生產者應用程序就能夠在等信道返回確認的同時繼續發送下一條消息,當消息最終獲得確認以後,生產者應用即可以經過回調方法來處理該確認消息,若是RabbitMQ由於自身內部錯誤致使消息丟失,就會發送一條nack消息,生產者應用程序一樣能夠在回調方法中處理該nack消息。

channel 被設置成 confirm 模式以後,全部被 publish 的後續消息都將被 confirm(即 ack) 或者被nack一次。可是沒有對消息被 confirm 的快慢作任何保證,而且同一條消息不會既被 confirm又被nack

確認機制的三種實現

  1. 普通confirm模式:每發送一條消息後,調用waitForConfirms()方法,等待服務器端confirm。其實是一種串行confirm了。
  2. 批量confirm模式:每發送一批消息後,調用waitForConfirms()方法,等待服務器端confirm
  3. 異步confirm模式:提供一個回調方法,服務端confirm了一條或者多條消息後Client端會回調這個方法。

普通confirm

//開啓Procedure確認機制

channel.confirmSelect();

//發佈消息

channel.basicPublish(exchangeName, routingKey,null,message);

//消息發送成功的確認,也能夠設置超時時間

if (channel.waitForConfirms([long timeOut]) {

    System.out.println("send success...");

} else {

    System.out.println("send failed...");

}

批量Confirm

批量發送消息後再進行確認。

異步Confirm

//待確認的序列

SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

//開啓確認機制

channel.confirmSelect();

//添加處理事件

channel.addConfirmListener(new ConfirmListener() {

    publicvoid handleAck(longdeliveryTag, booleanmultiple) throws IOException {

       if (multiple) {

           confirmSet.headSet(deliveryTag + 1).clear();

       } else {

           confirmSet.remove(deliveryTag);

       }

       System.out.println("發送成功...");

    }

    publicvoid handleNack(longdeliveryTag, booleanmultiple) throws IOException {

       System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);

       if (multiple) {

           confirmSet.headSet(deliveryTag + 1).clear();

       } else {

           confirmSet.remove(deliveryTag);

       }

       System.err.println("發送失敗...");

    }

});

 

//發送消息

for (inti = 0; i < 10; i++) {

    //獲取下個發送序號

    longnextSeqNo = channel.getNextPublishSeqNo();

    channel.basicPublish(exchangeName, routingKey, null, ("車票ID:" + i).getBytes());

    //加入待處理集合中

    confirmSet.add(nextSeqNo);

    //休息0.2s

    Thread.sleep(200);

}

 

 

 

 

 

 

Consumer端的確認

自動確認,默認是自動確認,即獲取消息後,直接確認。

手動確認,給當前消息設置狀態,當手動ack後服務端纔會刪除該消息,若是返回nack,從新入隊。

//手動確認

booleanautoAck = false;

       channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) {

           @Override

           publicvoid handleDelivery(String consumerTag,

                  Envelope envelope,  

                  AMQP.BasicProperties properties,

                  byte[] body) throws IOException {

              //…其餘處理操做

longdeliveryTag = envelope.getDeliveryTag();

              //確認消息

              channel.basicAck(deliveryTag, false);

           }

       });

 

附件:

https://blog.csdn.net/wangshuminjava/article/details/80998992

深刻解讀RabbitMQ工做原理及簡單使用

http://www.javashuo.com/article/p-cabanpbf-gk.html

RabbitMQ深刻學習指導

 

一、什麼是消息隊列?

概念:

消息隊列(Message Queue,簡稱MQ),本質是個隊列,FIFO先入先出,只不過隊列中存放的內容是一些Message

二、爲何要用消息隊列,應用場景?

不一樣系統、進程或者線程直接進行通訊。

 

系統解耦,將要作的部分放入隊列,便於模塊分離。

傳統模式

 

使用隊列解耦

 

 

緩衝功能,當有大量請求要處理時,能夠先入隊而後依次處理,保證系統可靠性。

例如:淘寶秒殺活動等。

 

 

異步操做,有時候爲了快速響應,可使用隊列來實現異步,好比郵箱驗證,手機驗證碼發送等。

 

三、消息隊列的幾種模式。

3-1、簡單模式(單生產者單消費者)

            

一個線程負責生產,一個線程負責總隊列裏取出來消費。

3-2、單生產者多消費者

 

一個線程生產,多個線程取出來消費。

3-3、訂閱/發佈模式

 

 一個發佈者發送消息,多個訂閱者能夠同時獲取到發佈的消息

3-4、路由模式

 

3-5、主題模式,按規則模糊匹配

 

3-5、廣播模式

生產者發送的消息會發往每一個與其綁定的隊列。

 

4RabbitMQ的幾個組成部分

AMQP協議: Advanced Message Queuing Protocol 高級消息隊列協議,是一個異步消息傳遞所使用的應用層協議規範。

 

組成:

         服務主機:接收客戶端請求,並做出相應處理

         虛擬主機:一個服務器能夠開啓多個Virtual Host,每一個虛擬主機都能提供完整的服務,有本身的權限控制。

         生產者:發送消息

         消費者:接收消息並處理

         交換器:RabbitMQ中,消息不是直接發往隊列,而是要先給交換器,而後交換器按照必定的路由規則發送到相對應的隊列上。

         路由Key:發送消息時要指定交換器和路由Key

         綁定:將隊列和交換器綁定起來路由Key做爲綁定時的關鍵字

         隊列:消息的載體,消費者從隊列中獲取消息,路由器根據路由規則把消息發往對應的隊列。

         消息:隊列中存儲的信息單元。

5、工做原理

生產者發送消息時指定交換器名和路由Key,而後交換器根據路由Key與綁定信息進行比對,找到對應的隊列後將信息發送出去。

消費者監聽某個隊列,若是有消息就取出來作對應操做,沒有就阻塞。


 

 

6、交換器的幾個工做模式

Direct:固定名稱匹配,只有路由Key與綁定的Key一致纔會將消息發送到該隊列。

Topic:主題模式,路由Key能夠用*#來填充

 

#能夠匹配任意多個單詞,*只能匹配一個單詞

好比bingdKey  x.y  x.y.z  a.y  a.b.z

x.*只能匹配x.y,而x.#能夠匹配x.y x.y.z

Fanout:廣播模式

7Demo

安裝RabbitMQ,並啓動服務。默認用戶名密碼guest。建立VirtualHost

7-1、單生產者單消費者

publicclass Recv {

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //創建到代理服務器到鏈接

       Connection conn = factory.newConnection();

       //得到信道

       final Channel channel = conn.createChannel();

       //聲明交換器

       String exchangeName = "hello-exchange";

       channel.exchangeDeclare(exchangeName, "direct", true);

       //聲明隊列

       String queueName = channel.queueDeclare().getQueue();

       String routingKey = "hola";

       //綁定隊列,經過鍵 hola將隊列和交換器綁定起來

       channel.queueBind(queueName, exchangeName, routingKey);

 

           //消費消息

           booleanautoAck = false;

           String consumerTag = "";

           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {

              @Override

              publicvoid handleDelivery(String consumerTag,

                     Envelope envelope,  

                     AMQP.BasicProperties properties,

                     byte[] body) throws IOException {

                  String routingKey = envelope.getRoutingKey();

                  String contentType = properties.getContentType();

                  System.out.println("消費的路由鍵:" + routingKey);

                  System.out.println("消費的內容類型:" + contentType);

                  longdeliveryTag = envelope.getDeliveryTag();

                  //確認消息

                  channel.basicAck(deliveryTag, false);

                  System.out.println("消費的消息體內容:");

                  String bodyStr = new String(body, "UTF-8");

                  System.out.println(bodyStr);

 

              }

           });

    }

}

publicclass Send {

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立鏈接工廠

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //設置 RabbitMQ 地址

        factory.setHost("localhost");

        //創建到代理服務器到鏈接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //聲明交換器

        String exchangeName = "hello-exchange";

        channel.exchangeDeclare(exchangeName, "direct", true);

 

        String routingKey = "hola";

        //發佈消息

        byte[] messageBodyBytes = "quit".getBytes();

        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

 

        channel.close();

        conn.close();

 

    }

}

 

7-2、單生產者多消費者

11的相似,只不過兩個消費者共同消費一個隊列內的信息,複製一份消費者便可。

7-3、訂閱/發佈模式

生產者發送的消息,發往每一個訂閱他的消費者那裏。全部消費者均可以獲取相同的信息。

發佈者A

publicclass Send {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立鏈接工廠

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //設置 RabbitMQ 地址

        factory.setHost("localhost");

        //創建到代理服務器到鏈接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //聲明交換器

        String exchangeName = "王力宏";

        channel.exchangeDeclare(exchangeName, "fanout", true);

 

        //發佈消息

        byte[] messageBodyBytes = "王力宏發佈的消息:啦啦啦啦啦".getBytes();

        channel.basicPublish(exchangeName, "", null, messageBodyBytes);

 

        channel.close();

        conn.close();

    }

}

發佈者B

publicclass Send2 {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立鏈接工廠

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //設置 RabbitMQ 地址

        factory.setHost("localhost");

        //創建到代理服務器到鏈接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //聲明交換器

        String exchangeName = "趙薇";

        channel.exchangeDeclare(exchangeName, "fanout", true);

 

        //發佈消息

        byte[] messageBodyBytes = "趙薇發佈的消息:啊啊啊啊".getBytes();

        channel.basicPublish(exchangeName, "", null, messageBodyBytes);

 

        channel.close();

        conn.close();

    }

}

訂閱者A1A2

publicclass Recv {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //創建到代理服務器到鏈接

       Connection conn = factory.newConnection();

       //得到信道

       final Channel channel = conn.createChannel();

       //聲明交換器

       String exchangeName = "王力宏";

       channel.exchangeDeclare(exchangeName, "fanout", true);

       //聲明隊列

       String queueName = channel.queueDeclare().getQueue();

       //綁定隊列,經過鍵 hola將隊列和交換器綁定起來

       channel.queueBind(queueName, exchangeName, "");

 

       while(true) {

           //消費消息

           booleanautoAck = false;

           String consumerTag = "";

           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {

              @Override

              publicvoid handleDelivery(String consumerTag,

                     Envelope envelope,  

                     AMQP.BasicProperties properties,

                     byte[] body) throws IOException {

                  String routingKey = envelope.getRoutingKey();

                  String contentType = properties.getContentType();

                  System.out.println("消費的路由鍵:" + routingKey);

                  System.out.println("消費的內容類型:" + contentType);

                  longdeliveryTag = envelope.getDeliveryTag();

                  //確認消息

                  channel.basicAck(deliveryTag, false);

                  System.out.println("消費的消息體內容:");

                  String bodyStr = new String(body, "UTF-8");

                  System.out.println(bodyStr);

               }

           });

       }

    }

}

訂閱者B1B2

publicclass Recv3 {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //創建到代理服務器到鏈接

       Connection conn = factory.newConnection();

       //得到信道

       final Channel channel = conn.createChannel();

       channel.exchangeDeclare("趙薇", "fanout", true);

       //聲明交換器

       String exchangeName = "趙薇";

       //聲明隊列

       String queueName = channel.queueDeclare().getQueue();

       //綁定隊列,經過鍵 hola將隊列和交換器綁定起來

       channel.queueBind(queueName, exchangeName, "");

 

       while(true) {

           //消費消息

           booleanautoAck = false;

           String consumerTag = "";

           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {

              @Override

              publicvoid handleDelivery(String consumerTag,

                     Envelope envelope,  

                     AMQP.BasicProperties properties,

                     byte[] body) throws IOException {

                  String routingKey = envelope.getRoutingKey();

                  String contentType = properties.getContentType();

                  System.out.println("消費的路由鍵:" + routingKey);

                  System.out.println("消費的內容類型:" + contentType);

                  longdeliveryTag = envelope.getDeliveryTag();

                  //確認消息

                  channel.basicAck(deliveryTag, false);

                  System.out.println("消費的消息體內容:");

                  String bodyStr = new String(body, "UTF-8");

                  System.out.println(bodyStr);

 

              }

           });

       }

    }

 

}

 

7-4、主題模式

生產者

publicclass Send {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立鏈接工廠

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //設置 RabbitMQ 地址

        factory.setHost("localhost");

        //創建到代理服務器到鏈接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //聲明交換器

        String exchangeName = "topic-exchange";

        channel.exchangeDeclare(exchangeName, "topic", true);

 

        //

        String routingKey = "#.B";

       

        //發佈消息

        for (inti = 0; i < 3; i++) {

            channel.basicPublish(exchangeName, routingKey, null, "ss".getBytes());

       }

        //byte[] messageBodyBytes = "匹配消息".getBytes();

        //channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

        channel.close();

        conn.close();

    }

}

消費者A

publicclass Recv {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //創建到代理服務器到鏈接

       Connection conn = factory.newConnection();

       //得到信道

       final Channel channel = conn.createChannel();

       //聲明交換器

       String exchangeName = "topic-exchange";

       channel.exchangeDeclare(exchangeName, "topic", true);

       //聲明隊列

       String queueName = channel.queueDeclare("X", false, false, false, null).getQueue();

      

       String routingKey = "X.A";

       //綁定隊列,經過鍵 hola將隊列和交換器綁定起來

       channel.queueBind(queueName, exchangeName, routingKey);

 

       while(true) {

           //消費消息

           booleanautoAck = false;

           String consumerTag = "";

           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {

              @Override

              publicvoid handleDelivery(String consumerTag,

                     Envelope envelope,  

                     AMQP.BasicProperties properties,

                     byte[] body) throws IOException {

                  String routingKey = envelope.getRoutingKey();

                  String contentType = properties.getContentType();

                  System.out.println("消費的路由鍵:" + routingKey);

                  System.out.println("消費的內容類型:" + contentType);

                  longdeliveryTag = envelope.getDeliveryTag();

                  //確認消息

                  channel.basicAck(deliveryTag, false);

                  System.out.println("消費的消息體內容:");

                  String bodyStr = new String(body);

                  System.out.println(bodyStr);

              }

           });

       }

    }

}

消費者BroutingKey routingKey.A

消費者CroutingKey  routingKey.B

7-5、廣播模式,不須要管routingKeybindingKey是否匹配。

生產者

publicclass Send {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

      

       //建立鏈接工廠

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        //設置 RabbitMQ 地址

        factory.setHost("localhost");

        //創建到代理服務器到鏈接

        Connection conn = factory.newConnection();

        //得到信道

        Channel channel = conn.createChannel();

        //聲明交換器

        String exchangeName = "fanout-exchange";

        channel.exchangeDeclare(exchangeName, "fanout", true);

 

        //

        String routingKey = "hola";

       

        //發佈消息

        byte[] messageBodyBytes = "羣發消息".getBytes();

        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

 

        channel.close();

        conn.close();

    }

}

消費者

publicclass Recv {

 

    publicstaticvoid main(String[] args) throws IOException, TimeoutException {

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setUsername("guest");

       factory.setPassword("guest");

       factory.setHost("localhost");

       //創建到代理服務器到鏈接

       Connection conn = factory.newConnection();

       //

相關文章
相關標籤/搜索