第一步:下載erlang
http://www.erlang.org/downloads/19.3html
第二步:下載rabbitMQ
http://www.rabbitmq.com/download.htmlspring
rabbitMQ安裝完成後,打開rabbitMQ控制檯 輸入:rabbitmq-plugins enable rabbitmq_management
打開網址 http://127.0.0.1:15672/ 默認帳號 guest/guestapi
項目配置:
1、pom文件的配置:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>瀏覽器
或者服務器
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.6.5.RELEASE</version>
</dependency>網絡
mq.host=127.0.0.1
mq.username=guest
mq.password=guest
mq.port=5672mybatis
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" > <rabbit:connection-factory id="connectionFactory" username="${mq.username}" password="${mq.password}" host="${mq.host}" port="${mq.port}" virtual-host="/" /> <!-- 定義rabbit template 用於數據的接收和發送 --> <rabbit:template id="amqTemplate" connection-factory="connectionFactory" exchange="Q_PAY_PPMS_RECON"></rabbit:template> <!-- 經過指定下面的admin信息,當前productor中的exchange和queue會在rabbitmq服務器上自動生成 --> <rabbit:admin connection-factory="connectionFactory" /> <!--定義queue 說明:durable:是否持久化 exclusive: 僅建立者可使用的私有隊列,斷開後自動刪除 auto_delete: 當全部消費客戶端鏈接斷開後,是否自動刪除隊列--> <rabbit:queue name="chase1" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="chase2" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="chase3" durable="true" auto-delete="false" exclusive="false" /> <!--topic 模式:發送端不是按固定的routing key發送消息,而是按字符串「匹配」發送,接收端一樣如此。 --> <rabbit:topic-exchange name="mq.asdfExChange" durable="true" auto-delete="false"> <rabbit:bindings> <!-- 配置多個消費者 根據不一樣業務類型選擇對應消費者 pattern="*.*.test1" --> <rabbit:binding queue="chase1" pattern="mq.*.send"></rabbit:binding> <rabbit:binding queue="chase2" pattern="mq.*.send"></rabbit:binding> <rabbit:binding queue="chase3" pattern="mq.*.send"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!-- fanout 模式:客戶端中只要是與該路由綁定在一塊兒的隊列都會收到相關消息,這相似廣播,發送端無論隊列是誰,都由客戶端本身去綁定,誰須要數據誰去綁定本身的相應隊列 --> <rabbit:fanout-exchange name="delayed_message_exchange" durable="true" auto-delete="false" id="delayed_message_exchange"> <rabbit:bindings> <rabbit:binding queue="chase1"/> <rabbit:binding queue="chase2"/> <rabbit:binding queue="chase3"/> </rabbit:bindings> </rabbit:fanout-exchange> <!--定義direct-exchange direct 消息轉換隊列 綁定key,意思就是消息與一個特定的路由鍵匹配,會轉發。rabbit:binding:設置消息queue匹配的key --> <rabbit:direct-exchange name="mq.qwerExChange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="chase1" key="mq.qwer.send" ></rabbit:binding> <rabbit:binding queue="chase2" key="mq.qwer.send2" ></rabbit:binding> <rabbit:binding queue="chase3" key="mq.qwer.send3" ></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!-- 消息接收者 --> <bean id="asdfConsumer" class="com.rabbitmq.Consumor"></bean> <bean id="asdfConsumer2" class="com.rabbitmq.Consumor2"></bean> <bean id="qwerConsumer3" class="com.rabbitmq.Consumor3"></bean> <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 acknowledeg = "manual",意爲表示該消費者的ack方式爲手動--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <rabbit:listener queues="chase1" ref="asdfConsumer"/> </rabbit:listener-container> <rabbit:listener-container connection-factory="connectionFactory" > <rabbit:listener queues="chase2" ref="asdfConsumer2"/> </rabbit:listener-container> <rabbit:listener-container connection-factory="connectionFactory" > <rabbit:listener queues="chase3" ref="qwerConsumer3"/> </rabbit:listener-container> </beans>
4、spring主配置文件中引入剛剛添加的配置文件測試
<!-- 加載配置文件 --> <context:property-placeholder location="classpath:rabbitMQ.properties" /> <import resource="spring-rabbitmq.xml"></import>
5、生產者生產消息spa
在代碼中注入:
@Autowired
private AmqpTemplate amqpTemplate;
以下能夠將消息發送到交換器(mq.asdfExChange)中,交換器的工做模式爲topic模式,根據工做模式將消息發送到隊列chase1,chase2,chase3:code
amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdfExChange.send", msg);
6、消費者消費消息
定義在spring-mybatis.xml中配置的消費者:com.rabbitmq.Consumor和com.rabbitmq.Consumor2和com.rabbitmq.Consumor3
這裏只定義一個Consumor供參考:
public class Consumor implements ChannelAwareMessageListener { /** * message 消息實體 * Channel 當前通道 */ public void onMessage(Message message, Channel channel) throws Exception { System.out.println("消費者接收到信息"); String msg = new String(message.getBody(), "utf-8"); //消息的標識,false只確認當前一個消息收到,true確認全部consumer得到的消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //ack返回false,並從新回到隊列,api裏面解釋得很清楚 //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //true拒絕消息 false確認接受到消息 //channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); System.out.println("消費者消費掉了消息:" + msg); } }
7、消息確認機制
首先介紹一下消息確認的幾種類型:
AcknowledgeMode.NONE:自動確認
AcknowledgeMode.AUTO:根據狀況確認
AcknowledgeMode.MANUAL:手動確認
消息經過 ACK 確認是否被正確接收,每一個 Message 都要被確認(acknowledged),能夠自動去ACK或手動ACK(在spring-mybatis.xml中配置acknowledge="manual")
1.自動確認會在消息發送給消費者後當即確認,但存在丟失消息的可能,若是消費端消費邏輯拋出異常,也就是消費端沒有處理成功這條消息,那麼就至關於丟失了消息
2.根據狀況確認會在消費者掛掉,待ack的消息迴歸到隊列中。消費者拋出異常,消息會不斷的被重發,直處處理成功。不會丟失消息,即使服務掛掉,沒有處理完成的消息會重回隊列,可是異常會讓消息不斷重試
3.若是消息已經被處理,但後續代碼拋出異常,使用 Spring 進行管理的話消費端業務邏輯會進行回滾,這也一樣形成了實際意義的消息丟失
4.若是手動確認則當消費者調用 ack、nack、reject 幾種方法進行確認,手動確承認以在業務失敗後進行一些操做,若是消息未被 ACK 則會發送到下一個消費者(下一個消費者指的是什麼,沒有下一個消費者會怎樣)
5.若是某個服務忘記 ACK 了,則 RabbitMQ 不會再發送數據給它,由於 RabbitMQ 認爲該服務的處理能力有限,標記爲uncheck狀態
6.手動確認方法 channel.basicAck(deliveryTag, multiple)
deliveryTag(惟一標識 ID):當一個消費者向 RabbitMQ 註冊後,會創建起一個 Channel ,RabbitMQ 會用 basic.deliver 方法向消費者推送消息,這個方法攜帶了一個 deliveryTag,
它表明了 RabbitMQ 向該 Channel 投遞的這條消息的惟一標識 ID,是一個單調遞增的正整數,deliveryTag 的範圍僅限於 Channel
multiple:爲了減小網絡流量,手動確承認以被批處理,當該參數爲 true 時,則能夠一次性確認 deliveryTag 小於等於傳入值的全部消息
7.否定消息方法 channel.basicNack(deliveryTag, multiple, requeue)
multiple:是否批量.true:將一次性拒絕全部小於deliveryTag的消息。
requeue:被拒絕的是否從新入隊列
8.拒絕消息方法 channel.basicReject(deliveryTag, requeue)
requeue:被拒絕的是否從新入隊列
問題1:剛開始運行一切OK,後來我把<rabbit:topic-exchange>下的chase2和chase3刪除只留下chase1,結果發現發給chase1的信息,仍是同時發給了chase2和chase3
1.確認配置文件是否正常
2.瀏覽器打開 http://127.0.0.1:15672 找到設置的交換器,好比我設置的交換器 mq.asdfExChange ,發現下面綁定channel列表裏依然是3個分別是chase1,chase2,chase3
原來雖然在配置文件更新了,可是在rabbitMQ裏沒有進行同步更新
問題2:rabbitMQ後臺管理中的ready、unchecked、total分別是什麼意思
ready是準備發送的消息 unchecked是消費了未確認 total=ready+unchecked
問題3:消費者消費消息時候出錯如何處理
通過測試發現,若是異常出如今消息確認以後,不影響後面的消息推送。
若是異常出如今確認消息以前,會致使消息沒有被確認,打開rabbitMQ控制檯,發現uncheck狀態的消息數增長。
能夠經過在消費者邏輯裏try..catch..處理,確保消息確承認以正常返回給rabbitMQ
文章參考: https://www.jianshu.com/p/2c5eebfd0e95 https://www.cnblogs.com/piaolingzxh/p/5448927.html