SSM框架下配置rabbitMQ

1、準備工做:

第一步:下載erlang  
http://www.erlang.org/downloads/19.3html


第二步:下載rabbitMQ 
http://www.rabbitmq.com/download.htmlspring

rabbitMQ安裝完成後,打開rabbitMQ控制檯 輸入:rabbitmq-plugins enable rabbitmq_management 
打開網址 http://127.0.0.1:15672/  默認帳號 guest/guestapi


項目配置:
1、pom文件的配置:
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.3.5.RELEASE</version>
</dependency>瀏覽器

或者服務器

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>3.6.0</version>
</dependency>
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>1.6.5.RELEASE</version>
</dependency>網絡

2、rabitmq.properties文件配置:

mq.host=127.0.0.1
mq.username=guest
mq.password=guest
mq.port=5672mybatis

3、spring-mybatis.xml文件配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >

    <rabbit:connection-factory id="connectionFactory"
                               username="${mq.username}" password="${mq.password}" host="${mq.host}" port="${mq.port}"
                               virtual-host="/" />

    <!-- 定義rabbit template 用於數據的接收和發送 -->
    <rabbit:template id="amqTemplate" connection-factory="connectionFactory"
                     exchange="Q_PAY_PPMS_RECON"></rabbit:template>

    <!-- 經過指定下面的admin信息,當前productor中的exchange和queue會在rabbitmq服務器上自動生成 -->
    <rabbit:admin connection-factory="connectionFactory" />

    <!--定義queue  說明:durable:是否持久化 exclusive: 僅建立者可使用的私有隊列,斷開後自動刪除 auto_delete: 當全部消費客戶端鏈接斷開後,是否自動刪除隊列-->
    <rabbit:queue name="chase1" durable="true" auto-delete="false" exclusive="false" />
    <rabbit:queue name="chase2" durable="true" auto-delete="false" exclusive="false" />
    <rabbit:queue name="chase3" durable="true" auto-delete="false" exclusive="false" />
    <!--topic 模式:發送端不是按固定的routing key發送消息,而是按字符串「匹配」發送,接收端一樣如此。 -->
    <rabbit:topic-exchange name="mq.asdfExChange"
                           durable="true" auto-delete="false">
        <rabbit:bindings>
            <!-- 配置多個消費者 根據不一樣業務類型選擇對應消費者  pattern="*.*.test1" -->
            <rabbit:binding queue="chase1" pattern="mq.*.send"></rabbit:binding>
            <rabbit:binding queue="chase2" pattern="mq.*.send"></rabbit:binding>
            <rabbit:binding queue="chase3" pattern="mq.*.send"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!-- fanout 模式:客戶端中只要是與該路由綁定在一塊兒的隊列都會收到相關消息,這相似廣播,發送端無論隊列是誰,都由客戶端本身去綁定,誰須要數據誰去綁定本身的相應隊列 -->
    <rabbit:fanout-exchange name="delayed_message_exchange" durable="true" auto-delete="false" id="delayed_message_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="chase1"/>
            <rabbit:binding queue="chase2"/>
            <rabbit:binding queue="chase3"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <!--定義direct-exchange direct 消息轉換隊列 綁定key,意思就是消息與一個特定的路由鍵匹配,會轉發。rabbit:binding:設置消息queue匹配的key -->
    <rabbit:direct-exchange name="mq.qwerExChange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="chase1" key="mq.qwer.send" ></rabbit:binding>
            <rabbit:binding queue="chase2" key="mq.qwer.send2" ></rabbit:binding>
            <rabbit:binding queue="chase3" key="mq.qwer.send3" ></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>


    <!-- 消息接收者 -->
    <bean id="asdfConsumer" class="com.rabbitmq.Consumor"></bean>
    <bean id="asdfConsumer2" class="com.rabbitmq.Consumor2"></bean>
    <bean id="qwerConsumer3" class="com.rabbitmq.Consumor3"></bean>
    
    <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象
    acknowledeg = "manual",意爲表示該消費者的ack方式爲手動-->
    <rabbit:listener-container connection-factory="connectionFactory"  acknowledge="manual">
        <rabbit:listener  queues="chase1"  ref="asdfConsumer"/>
    </rabbit:listener-container>

    <rabbit:listener-container connection-factory="connectionFactory" >
        <rabbit:listener  queues="chase2"  ref="asdfConsumer2"/>
    </rabbit:listener-container>
    <rabbit:listener-container connection-factory="connectionFactory" >
        <rabbit:listener  queues="chase3"  ref="qwerConsumer3"/>
    </rabbit:listener-container>
</beans>


4、spring主配置文件中引入剛剛添加的配置文件測試

    <!-- 加載配置文件 -->
    <context:property-placeholder location="classpath:rabbitMQ.properties" />
    
    <import resource="spring-rabbitmq.xml"></import>


5、生產者生產消息spa

在代碼中注入:
    @Autowired
    private AmqpTemplate amqpTemplate;
    
以下能夠將消息發送到交換器(mq.asdfExChange)中,交換器的工做模式爲topic模式,根據工做模式將消息發送到隊列chase1,chase2,chase3:code

amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdfExChange.send", msg);


6、消費者消費消息
定義在spring-mybatis.xml中配置的消費者:com.rabbitmq.Consumor和com.rabbitmq.Consumor2和com.rabbitmq.Consumor3
這裏只定義一個Consumor供參考:

public class Consumor implements ChannelAwareMessageListener {

    /**
     * message 消息實體
     * Channel 當前通道
     */
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println("消費者接收到信息");
        String msg = new String(message.getBody(), "utf-8");
        //消息的標識,false只確認當前一個消息收到,true確認全部consumer得到的消息
         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        //ack返回false,並從新回到隊列,api裏面解釋得很清楚
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        //true拒絕消息 false確認接受到消息
        //channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        System.out.println("消費者消費掉了消息:" + msg);
    }
    
}


7、消息確認機制
首先介紹一下消息確認的幾種類型:
AcknowledgeMode.NONE:自動確認
AcknowledgeMode.AUTO:根據狀況確認
AcknowledgeMode.MANUAL:手動確認
消息經過 ACK 確認是否被正確接收,每一個 Message 都要被確認(acknowledged),能夠自動去ACK或手動ACK(在spring-mybatis.xml中配置acknowledge="manual")
1.自動確認會在消息發送給消費者後當即確認,但存在丟失消息的可能,若是消費端消費邏輯拋出異常,也就是消費端沒有處理成功這條消息,那麼就至關於丟失了消息
2.根據狀況確認會在消費者掛掉,待ack的消息迴歸到隊列中。消費者拋出異常,消息會不斷的被重發,直處處理成功。不會丟失消息,即使服務掛掉,沒有處理完成的消息會重回隊列,可是異常會讓消息不斷重試
3.若是消息已經被處理,但後續代碼拋出異常,使用 Spring 進行管理的話消費端業務邏輯會進行回滾,這也一樣形成了實際意義的消息丟失
4.若是手動確認則當消費者調用 ack、nack、reject 幾種方法進行確認,手動確承認以在業務失敗後進行一些操做,若是消息未被 ACK 則會發送到下一個消費者(下一個消費者指的是什麼,沒有下一個消費者會怎樣)
5.若是某個服務忘記 ACK 了,則 RabbitMQ 不會再發送數據給它,由於 RabbitMQ 認爲該服務的處理能力有限,標記爲uncheck狀態
6.手動確認方法 channel.basicAck(deliveryTag, multiple)
deliveryTag(惟一標識 ID):當一個消費者向 RabbitMQ 註冊後,會創建起一個 Channel ,RabbitMQ 會用 basic.deliver 方法向消費者推送消息,這個方法攜帶了一個 deliveryTag, 
它表明了 RabbitMQ 向該 Channel 投遞的這條消息的惟一標識 ID,是一個單調遞增的正整數,deliveryTag 的範圍僅限於 Channel
multiple:爲了減小網絡流量,手動確承認以被批處理,當該參數爲 true 時,則能夠一次性確認 deliveryTag 小於等於傳入值的全部消息
7.否定消息方法  channel.basicNack(deliveryTag, multiple, requeue)
multiple:是否批量.true:將一次性拒絕全部小於deliveryTag的消息。
requeue:被拒絕的是否從新入隊列
8.拒絕消息方法 channel.basicReject(deliveryTag, requeue)
requeue:被拒絕的是否從新入隊列


問題1:剛開始運行一切OK,後來我把<rabbit:topic-exchange>下的chase2和chase3刪除只留下chase1,結果發現發給chase1的信息,仍是同時發給了chase2和chase3
1.確認配置文件是否正常    
2.瀏覽器打開 http://127.0.0.1:15672 找到設置的交換器,好比我設置的交換器 mq.asdfExChange ,發現下面綁定channel列表裏依然是3個分別是chase1,chase2,chase3
原來雖然在配置文件更新了,可是在rabbitMQ裏沒有進行同步更新

問題2:rabbitMQ後臺管理中的ready、unchecked、total分別是什麼意思
ready是準備發送的消息  unchecked是消費了未確認  total=ready+unchecked

問題3:消費者消費消息時候出錯如何處理
通過測試發現,若是異常出如今消息確認以後,不影響後面的消息推送。
若是異常出如今確認消息以前,會致使消息沒有被確認,打開rabbitMQ控制檯,發現uncheck狀態的消息數增長。
能夠經過在消費者邏輯裏try..catch..處理,確保消息確承認以正常返回給rabbitMQ

文章參考: https://www.jianshu.com/p/2c5eebfd0e95     https://www.cnblogs.com/piaolingzxh/p/5448927.html

相關文章
相關標籤/搜索