rabbitMQ

1.rabbitMQ相對於傳統的socket通訊解決的問題java

 1)信息的發送者和接收者如何維持這個鏈接,若是一方的鏈接中斷,這期間的數據如何方式丟失?spring

 2)如何下降發送者和接收者的耦合度?json

 3)如何讓Priority(優先級)高的接收者先接到數據?app

 4)如何作到load balance?有效均衡接收者的負載?socket

 5)如何有效的將數據發送到相關的接收者?也就是說將接收者subscribe 不一樣的數據,如何作有效的filter。maven

 6)如何作到可擴展,甚至將這個通訊模塊發到cluster上?ui

 7)如何保證接收者接收到了完整,正確的數據?spa

  AMDQ協議解決了以上的問題,而RabbitMQ實現了AMQP。code

rabbitmq 服務至關於一個傳輸服務server

1.2 rabbitmq的相關概念

 Producer,數據的發送方,至關於生產者

 Consumer ,數據的接收方,至關於消費者

 Connection: 就是一個TCP的鏈接。Producer和Consumer都是經過TCP鏈接到RabbitMQ Server的

 Channels: 通道,虛擬鏈接。它創建在上述的TCP鏈接中。數據流動都是在Channel中進行的。

 一個Message有兩個部分:payload(有效載荷)和label(標籤)。payload顧名思義就是傳輸的數據。 label是exchange的名字或者說是一個tag,它描述了payload,並且RabbitMQ也是經過這個label來決定把這個Message發給哪一個Consumer。AMQP僅僅描述了label,而RabbitMQ決定了如何使用這個label的規則。

1.3 消息迴應機制

   使用ack確認消息的正確傳遞

  默認狀況下,若是一個消息被Consumer正確的接收到了,那麼該Message會被從queue(隊列)中刪除。固然也可讓多個隊列消費同一個消息(訂閱模式)。

 若是一個queue沒被任何的Consumer Subscribe(訂閱),那麼,若是這個queue有數據到達,那麼這個數據會被cache,不會被丟棄。當有Consumer時,這個數據會被當即發送到這個Consumer,這個數據被Consumer正確收到時,這個數據就被從queue中刪除。

若是這個app有bug,忘記了ack,那麼RabbitMQ Server不會再發送數據給它,由於Server認爲這個Consumer處理能力有限。

ack的機制能夠起到限流的做用(Benefitto throttling):在Consumer處理完成數據後發送ack,甚至在額外的延時後發送ack,將有效的balance Consumer的load。

須要注意的是 在消息被Consumer接收處理的過程當中出現了異常(沒有處理異常),而ack的機制是消息處理完纔會迴應,所以RabbitMQ Server會一直髮同一個message。

1.4 隊列 queue

consumer和producer  均可以建立隊列,隊列是rabbitmq內容用於存儲消息的。

多個消費者能夠訂閱同一個Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每一個消費者都收到全部的消息並處理。

1.設置爲持久化的隊列,queue中的消息會在server本地硬盤存儲一份,防止系統crash,數據丟失

2.設置爲臨時隊列,queue中的數據在系統重啓以後就會丟失

3.設置爲自動刪除的隊列,當不存在用戶鏈接到server,隊列中的數據會被自動刪除

1.5 Exchange 交換機

    生產者將消息發送到Exchange(交接機)中,由Exchange將消息路由到queue中

1.6 routing key (路由key)

生產者在將消息發送給Exchange的時候,通常會指定一個routing key,來指定這個消息的路由規則,而這個routing key須要與Exchange Type及binding key聯合使用才能最終生效。

1.7 binding 

而queue須要與exchange type(交換機類型) 和binding key 綁定

RabbitMQ中經過Binding將Exchange與Queue關聯起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue

1.8 Exchange Type

direct-exchange: direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key徹底匹配的Queue中。

fanout-exchange: fanout類型的Exchange路由規則很是簡單,它會把全部發送到該Exchange的消息路由到全部與它綁定的Queue中。

topic-exchange: 

direct類型的Exchange路由規則是徹底匹配binding key與routing key,但這種嚴格的匹配方式在不少狀況下不能知足實際業務需求。topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage類似,也是將消息路由到binding key與routing key相匹配的Queue中,但這裏的匹配規則有些不一樣,它約定:

  • routing key爲一個句點號「. 」分隔的字符串(咱們將被句點號「. 」分隔開的每一段獨立的字符串稱爲一個單詞),如「stock.usd.nyse」、「nyse.vmw」、「quick.orange.rabbit」
  • binding key與routing key同樣也是句點號「. 」分隔的字符串
  • binding key中能夠存在兩種特殊字符「*」與「#」,用於作模糊匹配,其中「*」用於匹配一個單詞,「#」用於匹配多個單詞(能夠是零個)

以上圖中的配置爲例,routingKey=」quick.orange.rabbit」的消息會同時路由到Q1與Q2,routingKey=」lazy.orange.fox」的消息會路由到Q1與Q2,routingKey=」lazy.brown.fox」的消息會路由到Q2,routingKey=」lazy.pink.rabbit」的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=」quick.brown.fox」、routingKey=」orange」、routingKey=」quick.orange.male.rabbit」的消息將會被丟棄,由於它們沒有匹配任何bindingKey。

headers-exchange:

headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
在綁定Queue與Exchange時指定一組鍵值對;當消息發送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否徹底匹配Queue與Exchange綁定時指定的鍵值對;若是徹底匹配則消息會路由到該Queue,不然不會路由到該Queue。

2. java 利用 spring的 template 對rabbit mq 進行操做

 2.1 maven 配置

//pom.xml
<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>3.5.1</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.4.5.RELEASE</version>
    </dependency>

app.properties中的文件配置

//application-mq.xml
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >

    <description>rabbitmq 鏈接服務配置</description>
    <!-- 鏈接配置 -->
    <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}"  virtual-host="${mq.vhost}"/>
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- spring template聲明-->
    <rabbit:template exchange="amqpExchange" id="amqpTemplate"  connection-factory="connectionFactory"  message-converter="jsonMessageConverter" />

    <!-- 消息對象json轉換類 -->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />  
</beans>

申明一個消息隊列Queue

//application-mq.xml
<rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" />

durable:是否持久化

exclusive: 僅建立者可使用的私有隊列,斷開後自動刪除

auto_delete: 當全部消費客戶端鏈接斷開後,是否自動刪除隊列

交換機定義,

//application-mq.xml
<rabbit:direct-exchange name="test-mq-exchange" durable="true" auto-delete="false" id="test-mq-exchange">
    <rabbit:bindings>
        <rabbit:binding queue="test_queue_key" key="test_queue_key"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

rabbit:binding:設置消息queue匹配的key,因爲exchange是direct模式,這裏的binding key 和路由key徹底匹配

發送消息的代碼:

將消息 發送到direct exchange中,而且指定路由key,queue(隊列)和路由key binding的時候,binding 也是指定的這個routingkey。

 

接收消息的代碼,經過監聽消息隊列獲取消息便可

QueueListenter 實現MessageListener,複寫onMessage方法

 

 

相關文章
相關標籤/搜索