RabbitMQ消息中間件

消息中間件簡單介紹java

消息中間件(消息隊列)是分佈式系統中重要的組件,主要解決應用耦合,異步消息,流量削鋒等問題實現高性能,高可用,可伸縮和最終一致性[架構] 使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
 
那什麼是RabbitMQ呢?
  • RabbitMQ是一個由Erlang語言發開的AMQP的開源實現
  • AMQP :Advanced Message Queue,高級消息隊列協議。它是應用層協議的一個開放標準,爲面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受產品、開發語言等條件的限制。
  • RabbitMQ 最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特色包括:
可靠性(Reliability) 使用了一些機制來保證可靠性,如持久、傳輸確認、發佈確認
靈活性(Flexible Routing) 在消息進入隊列以前,經過Exchange來路由消息,能夠將多個Exchange綁定一塊兒
消息集羣(Clustering) 多個RabbitMQ服務器開以組成一個集羣,造成一個邏輯Broker
高可用(Highly Available Queues) 隊列能夠在集羣中機器上進行鏡像,使得在部分節點出問題的狀況下隊列仍能夠用
多中協議(Multi-protocol) 支持多種消息隊列協議,好比STOMP、MQTT等等
多語言客戶端(Many Clients) 幾乎支持全部經常使用語言,好比java、.NET、Ruby等等
管理界面(Management UI) 提供了一個易用的用戶界面,使得用戶能夠監控和管理消息Broker的許多方面
跟蹤機制(Tracing) 若是消息異常,RabbitMQ提供了消息跟蹤機制,使用者能夠找出發生了什麼
插件機制(Plugin System) 提供了許多插件,從多方面進行擴展,也可編寫本身的插件

架構圖redis

主要概念spring

RabbitMQ Server: 也叫broker server,它是一種傳輸服務。 他的角色就是維護一條從Producer到Consumer的路線,保證數據可以按照指定的方式進行傳輸。
Producer: 消息生產者,如圖A、B、C,數據的發送方。消息生產者鏈接RabbitMQ服務器而後將消息投遞到Exchange。
Consumer:消息消費者,如圖一、二、3,數據的接收方。消息消費者訂閱隊列,RabbitMQ將Queue中的消息發送到消息消費者。
Exchange:生產者將消息發送到Exchange(交換器),由Exchange將消息路由到一個或多個Queue中(或者丟棄)。Exchange並不存儲消息。RabbitMQ中的Exchange有direct、fanout、topic、headers四種類型,每種類型對應不一樣的路由規則。
Queue:(隊列)是RabbitMQ的內部對象,用於存儲消息。消息消費者就是經過訂閱隊列來獲取消息的,RabbitMQ中的消息都只能存儲在Queue中,生產者生產消息並最終投遞到Queue中,消費者能夠從Queue中獲取消息並消費。多個消費者能夠訂閱同一個Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每一個消費者都收到全部的消息並處理。
RoutingKey:生產者在將消息發送給Exchange的時候,通常會指定一個routing key,來指定這個消息的路由規則,而這個routing key須要與Exchange Type及binding key聯合使用才能最終生效。在Exchange Type與binding key固定的狀況下(在正常使用時通常這些內容都是固定配置好的),咱們的生產者就能夠在發送消息給Exchange時,經過指定routing key來決定消息流向哪裏。RabbitMQ爲routing key設定的 長度限制爲255bytes。
Connection: (鏈接):Producer和Consumer都是經過TCP鏈接到RabbitMQ Server的。之後咱們能夠看到,程序的起始處就是創建這個TCP鏈接。
Channels: (信道):它創建在上述的TCP鏈接中。數據流動都是在Channel中進行的。也就是說,通常狀況是程序起始創建TCP鏈接,第二步就是創建這個Channel。
VirtualHost:權限控制的基本單位,一個VirtualHost裏面有若干Exchange和MessageQueue,以及指定被哪些user使用 
 
RabbitMQ安裝與啓動
(1)選擇相應的版本下載並安裝 Eralng
(2)下載好後雙擊安裝, 注意不要安裝在包含中文和空格的目錄下!安裝後window服務中就存在rabbitMQ了,而且是啓動狀態。
(3)安裝管理界面(插件):進入rabbitMQ安裝目錄的sbin目錄,輸入命令
rabbitmq‐plugins enable rabbitmq_management 

(4)從新啓動服務瀏覽器

(5)打開瀏覽器,地址欄輸入http://127.0.0.1:15672 ,便可看到管理界面的登錄頁服務器

 
輸入用戶名和密碼,都爲guest 進入主界面:
 
RabbitMQ發送與接收消息
直接模式(Direct)
建立隊列,名爲queue.test
 
代碼實現-消息生產者
(1)建立工程rabbitmq_demo,引入依賴 ,pom.xml以下:
<dependency> 
     <groupId>org.springframework.amqp</groupId> 
     <artifactId>spring‐rabbit</artifactId>
     <version>2.1.4.RELEASE</version>
</dependency>
(2)編寫配置文件applicationContext-rabbitmq-producer.xml 
<?xml version="1.0" encoding="UTF‐8"?> 
<beans xmlns="http://www.springframework.org/schema/beans"     
            xmlns:xsi="http://www.w3.org/2001/XMLSchema‐instance" 
            xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
            xsi:schemaLocation="http://www.springframework.org/schema/beans 
            http://www.springframework.org/schema/beans/spring‐beans.xsd 
            http://www.springframework.org/schema/rabbit 
            http://www.springframework.org/schema/rabbit/spring‐rabbit.xsd"> 
    <!‐‐鏈接工廠‐‐> 
    <rabbit:connection‐factory id="connectionFactory"
        host="127.0.0.1"
        port="5672"
        username="guest"
        password="guest" /> <rabbit:template id="rabbitTemplate" connection‐ factory="connectionFactory" />
</beans>
(3)消息生產者(發送消息)
 1      @Test
     public void test() { 2 //解析配置文件,從中獲取RabbitTemplate對象 3 ApplicationContext context=new ClassPathXmlApplicationContext("applicationContext-rabbitmq-producer.xml"); 4 RabbitTemplate rabbitTemplate= (RabbitTemplate)context.getBean("rabbitTemplate"); 5 //發送消息(queue.test這個隊列名稱須要在RabbitMQ中手動建立) 6 rabbitTemplate.convertAndSend("","queue.test","直接模式"); 7 System.out.println("消息發送成功"); 8 //關閉對象 9 ((ClassPathXmlApplicationContext) context).close(); 10 }

代碼實現、消息消費者架構

(1)編寫消息監聽類
1 public class MessageConsumer implements MessageListener {
2     public void onMessage(Message message) {
3         System.out.println("接收到消息:" +new String(message.getBody()) );
4     }
5 }
(2)建立配置文件applicationContext-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--鏈接工廠-->
    <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest"/>
    <!--隊列名稱-->
    <rabbit:queue name="queue.test" />
    <!--消費者監聽類class爲監聽類路徑-->
    <bean id="messageConsumer" class="cn.itcast.demo.MessageConsumer"> </bean>
    <!--設置監聽容器-->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queue-names="queue.test" ref="messageConsumer"/>
    </rabbit:listener-container>
</beans>
(3)編寫測試代碼Test2
    @Test
    public void test2(){
        //運行此方法會自動調用消息監聽類,並打印監聽到的消息
        ApplicationContext context=new ClassPathXmlApplicationContext("applicationContext-rabbitmq-consumer.xml");
    }

運行結果app

接收到消息:直接模式

 

分列模式(Fanout)
1.這種模式須要提早將Exchange與Queue進行綁定,一個Exchange能夠綁定多個Queue,一個Queue能夠同多個Exchange進行綁定。
2.這種模式不須要RouteKey
3.若是接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。
代碼實現(修改直接模式代碼便可)
修改、消息生產者(發送消息)
rabbitTemplate.convertAndSend("exchange.fanout_test","","分列模式走起");

修改消息、消費者配置文件(applicationContext-rabbitmq-consumer.xml)異步

    <!--隊列名稱-->
    <rabbit:queue name="queue.test" />
    <rabbit:queue name="queue.test1" />

    <!--消費者監聽類class爲監聽類路徑-->
    <bean id="messageConsumer" class="cn.itcast.demo.MessageConsumer"></bean>
    <bean id="messageConsumer1" class="cn.itcast.demo.MessageConsumer1"> </bean>

    <!--設置監聽容器-->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queue-names="queue.test" ref="messageConsumer"/>
        <rabbit:listener queue-names="queue.test1" ref="messageConsumer1"/>
    </rabbit:listener-container>

在增長一個消息監聽者類,而後運行測試代碼Test2便可分佈式

運行結果性能

我是Message、接收到消息:分列模式走起

我是Message一、接收到消息:分列模式走起

 

死信交換器 Dead Letter Exchanges
一個消息在知足以下條件下,會進死信路由,記住這裏是路由而不是隊列,一個路由能夠對應不少隊列。
(1) 一個消息被Consumer拒收了,而且reject方法的參數裏requeue是false。也就是說不會被再次放在隊列裏,被其餘消費者使用。
(2)消息存活時間到了,消息過時了。
(3)隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。
代碼實現
建立配置文件applicationContext-mq.xml( 交換器和隊列是自動配置,無需手動配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--配置connection-factory,指定鏈接rabbit server參數 -->
    <rabbit:connection-factory id="connectionFactory" username="guest" password="guest" host="127.0.0.1" port="5672" publisher-confirms="true"/>
    <rabbit:admin connection-factory="connectionFactory"></rabbit:admin>

    <!--建立交換機(過時隊列的交換機)-->
    <rabbit:direct-exchange id="exchange.delay.order.begin" name="exchange.delay.order.begin" durable="false" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="queue.delay.order.begin" key="delay"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 延時隊列(過時隊列) -->
    <rabbit:queue name="queue.delay.order.begin" durable="false">
        <rabbit:queue-arguments>
            <!-- 設置隊列過時時間爲1分鐘 -->
            <entry key="x-message-ttl" value="60000" value-type="java.lang.Long"/>
            <entry key="x-dead-letter-exchange" value="exchange.delay.order.done"/>
            <entry key="x-dead-letter-routing-key" value="delay"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

    <!--死信交換機定義-->
    <rabbit:direct-exchange id="exchange.delay.order.done" name="exchange.delay.order.done" durable="false" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="queue.delay.order.done" key="delay"/>
            <!-- binding key 相同爲 【delay】exchange轉發消息到多個隊列 -->
            <!--<rabbit:binding queue="queue.delay.order.done.two" key="delay" />-->
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <rabbit:queue name="queue.delay.order.done" durable="false"/>
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

    <!-- 消息接收者 -->
    <rabbit:listener-container connection-factory="connectionFactory" channel-transacted="false">
        <rabbit:listener queues="queue.delay.order.done" ref="orderMessageListener"/>
    </rabbit:listener-container>

    <!--建立一個消息監聽對象-->
    <bean class="cn.itcast.demo.MessageConsumer" id="orderMessageListener"></bean>
</beans>

建立測試類消息生產者(發送消息)

    @Test
    public void test1() {
        //解析配置文件,從中獲取RabbitTemplate對象
        ApplicationContext context=new ClassPathXmlApplicationContext("applicationContext-mq.xml");
        RabbitTemplate rabbitTemplate= (RabbitTemplate)context.getBean("rabbitTemplate");
        rabbitTemplate.convertAndSend("exchange.delay.order.begin","delay","延時隊列模式走起");
        System.out.println("消息發送成功");
        //關閉對象
        ((ClassPathXmlApplicationContext) context).close();
    }

建立消息監聽器(MessageConsumer)

public class MessageConsumer implements MessageListener {
    public void onMessage(Message message) {
        System.out.println("我是Message、接收到消息:" +new String(message.getBody()) );
    }
}

建立測試類消息消費者

    @Test
    public void test2(){
        //運行此方法會自動調用消息監聽類,並打印監聽到的消息
        ApplicationContext context=new ClassPathXmlApplicationContext("applicationContext-mq.xml");
    }

運行結果(因爲死信隊列時間設置爲1分鐘,因此須要一分鐘後纔會監聽到消息)

我是Message、接收到消息:延時隊列模式走起

 

如何解決消息重複消費

讓生產者發送每條數據的時候,裏面加一個全局惟一的id,相似訂單id之類的東西,而後你這裏消費到了以後,先根據這個id去好比redis裏查一下,以前消費過嗎?若是沒有消費過,你就處理,而後這個idredis。若是消費過了,那你就別處理了,保證別重複處理相同的消息便可。

相關文章
相關標籤/搜索