1.rabbitMQ相對於傳統的socket通訊解決的問題java
1)信息的發送者和接收者如何維持這個鏈接,若是一方的鏈接中斷,這期間的數據如何方式丟失?spring
2)如何下降發送者和接收者的耦合度?json
3)如何讓Priority(優先級)高的接收者先接到數據?app
4)如何作到load balance?有效均衡接收者的負載?socket
5)如何有效的將數據發送到相關的接收者?也就是說將接收者subscribe 不一樣的數據,如何作有效的filter。maven
6)如何作到可擴展,甚至將這個通訊模塊發到cluster上?ui
7)如何保證接收者接收到了完整,正確的數據?spa
AMDQ協議解決了以上的問題,而RabbitMQ實現了AMQP。code
rabbitmq 服務至關於一個傳輸服務server
1.2 rabbitmq的相關概念
Producer,數據的發送方,至關於生產者
Consumer ,數據的接收方,至關於消費者
Connection: 就是一個TCP的鏈接。Producer和Consumer都是經過TCP鏈接到RabbitMQ Server的
Channels: 通道,虛擬鏈接。它創建在上述的TCP鏈接中。數據流動都是在Channel中進行的。
一個Message有兩個部分:payload(有效載荷)和label(標籤)。payload顧名思義就是傳輸的數據。 label是exchange的名字或者說是一個tag,它描述了payload,並且RabbitMQ也是經過這個label來決定把這個Message發給哪一個Consumer。AMQP僅僅描述了label,而RabbitMQ決定了如何使用這個label的規則。
1.3 消息迴應機制
使用ack確認消息的正確傳遞
默認狀況下,若是一個消息被Consumer正確的接收到了,那麼該Message會被從queue(隊列)中刪除。固然也可讓多個隊列消費同一個消息(訂閱模式)。
若是一個queue沒被任何的Consumer Subscribe(訂閱),那麼,若是這個queue有數據到達,那麼這個數據會被cache,不會被丟棄。當有Consumer時,這個數據會被當即發送到這個Consumer,這個數據被Consumer正確收到時,這個數據就被從queue中刪除。
若是這個app有bug,忘記了ack,那麼RabbitMQ Server不會再發送數據給它,由於Server認爲這個Consumer處理能力有限。
ack的機制能夠起到限流的做用(Benefitto throttling):在Consumer處理完成數據後發送ack,甚至在額外的延時後發送ack,將有效的balance Consumer的load。
須要注意的是 在消息被Consumer接收處理的過程當中出現了異常(沒有處理異常),而ack的機制是消息處理完纔會迴應,所以RabbitMQ Server會一直髮同一個message。
1.4 隊列 queue
consumer和producer 均可以建立隊列,隊列是rabbitmq內容用於存儲消息的。
多個消費者能夠訂閱同一個Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每一個消費者都收到全部的消息並處理。
1.設置爲持久化的隊列,queue中的消息會在server本地硬盤存儲一份,防止系統crash,數據丟失
2.設置爲臨時隊列,queue中的數據在系統重啓以後就會丟失
3.設置爲自動刪除的隊列,當不存在用戶鏈接到server,隊列中的數據會被自動刪除
1.5 Exchange 交換機
生產者將消息發送到Exchange(交接機)中,由Exchange將消息路由到queue中
1.6 routing key (路由key)
生產者在將消息發送給Exchange的時候,通常會指定一個routing key,來指定這個消息的路由規則,而這個routing key須要與Exchange Type及binding key聯合使用才能最終生效。
1.7 binding
而queue須要與exchange type(交換機類型) 和binding key 綁定
RabbitMQ中經過Binding將Exchange與Queue關聯起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue
1.8 Exchange Type
direct-exchange: direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key徹底匹配的Queue中。
fanout-exchange: fanout類型的Exchange路由規則很是簡單,它會把全部發送到該Exchange的消息路由到全部與它綁定的Queue中。
topic-exchange:
direct類型的Exchange路由規則是徹底匹配binding key與routing key,但這種嚴格的匹配方式在不少狀況下不能知足實際業務需求。topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage類似,也是將消息路由到binding key與routing key相匹配的Queue中,但這裏的匹配規則有些不一樣,它約定:
以上圖中的配置爲例,routingKey=」quick.orange.rabbit」的消息會同時路由到Q1與Q2,routingKey=」lazy.orange.fox」的消息會路由到Q1與Q2,routingKey=」lazy.brown.fox」的消息會路由到Q2,routingKey=」lazy.pink.rabbit」的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=」quick.brown.fox」、routingKey=」orange」、routingKey=」quick.orange.male.rabbit」的消息將會被丟棄,由於它們沒有匹配任何bindingKey。
headers-exchange:
headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
在綁定Queue與Exchange時指定一組鍵值對;當消息發送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否徹底匹配Queue與Exchange綁定時指定的鍵值對;若是徹底匹配則消息會路由到該Queue,不然不會路由到該Queue。
2. java 利用 spring的 template 對rabbit mq 進行操做
2.1 maven 配置
//pom.xml <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.5.1</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.5.RELEASE</version> </dependency>
app.properties中的文件配置
//application-mq.xml <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" > <description>rabbitmq 鏈接服務配置</description> <!-- 鏈接配置 --> <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}" virtual-host="${mq.vhost}"/> <rabbit:admin connection-factory="connectionFactory"/> <!-- spring template聲明--> <rabbit:template exchange="amqpExchange" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /> <!-- 消息對象json轉換類 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> </beans>
//application-mq.xml <rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" />
durable:是否持久化
exclusive: 僅建立者可使用的私有隊列,斷開後自動刪除
auto_delete: 當全部消費客戶端鏈接斷開後,是否自動刪除隊列
交換機定義,
//application-mq.xml <rabbit:direct-exchange name="test-mq-exchange" durable="true" auto-delete="false" id="test-mq-exchange"> <rabbit:bindings> <rabbit:binding queue="test_queue_key" key="test_queue_key"/> </rabbit:bindings> </rabbit:direct-exchange>
rabbit:binding:設置消息queue匹配的key,因爲exchange是direct模式,這裏的binding key 和路由key徹底匹配
發送消息的代碼:
將消息 發送到direct exchange中,而且指定路由key,queue(隊列)和路由key binding的時候,binding 也是指定的這個routingkey。
接收消息的代碼,經過監聽消息隊列獲取消息便可
QueueListenter 實現MessageListener,複寫onMessage方法