消息隊列 RabbitMQ 與 Spring 整合使用

1、什麼是 RabbitMQ

RabbitMQ 是實現 AMQP(高級消息隊列協議)的消息中間件的一種,最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。html

 

RabbitMQ 是由 Erlang 語言開發,安裝 RabbitMQ 服務須要先安裝 Erlang 語言包。java

在 CentOS7上安裝 RabbitMQ 請點擊:spring

http://www.cnblogs.com/libra0920/p/7920698.html服務器

 

2、如何與 Spring 集成

1. 咱們都須要哪些 Jar 包?app

拋開單獨使用 Spring 的包不說,引入 RabbitMQ 咱們還須要兩個:框架

<!-- RabbitMQ -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.5.1</version>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.4.5.RELEASE</version>
</dependency>

 

2. 使用外部參數文件 application.properties:分佈式

mq.host=127.0.0.1
mq.username=queue
mq.password=1234
mq.port=8001
# 統一XML配置中易變部分的命名
mq.queue=test_mq

易變指的是在實際項目中,若是測試與生產環境使用的同一個 RabbitMQ 服務器。那咱們在部署時直接修改 properties 文件的參數便可,防止測試與生產環境混淆。性能

 

修改 applicationContext.xml 文件,引入咱們建立的 properties 文件測試

<context:property-placeholder location="classpath:application.properties"/>
<util:properties id="appConfig" location="classpath:application.properties"></util:properties>

 

3. 鏈接 RabbitMQ 服務器優化

<!-- 鏈接配置 -->
<rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}"
        password="${mq.password}" port="${mq.port}"  />
        
<rabbit:admin connection-factory="connectionFactory"/>

 

4. 聲明一個 RabbitMQ Template

<rabbit:template id="amqpTemplate" exchange="${mq.queue}_exchange" connection-factory="connectionFactory"  />

 

5. 在 applicationContext.xml 中聲明一個交換機。

<rabbit:topic-exchange> 標籤的 name 屬性就是在 RabbitMQ 服務器配置交換機的 name 值。

<rabbit:binding> 標籤的 queue 屬性是 6. <queue> 標籤的 id 屬性。

<rabbit:binding> 標籤的 pattern 屬性是在 RabbitMQ 服務器配置交換機與隊列綁定時的 Routing key 值(路由

<rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false">
    <rabbit:bindings>
        <rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

 

交換機的四種模式:

  • direct:轉發消息到 routigKey 明確指定的隊列。
  • topic:對 key 進行模式匹配,好比ab*能夠傳到到全部 ab* 的 queue。
  • headers:轉發到 headers 中的鍵值對徹底匹配的隊列。性能比較差不建議使用。
  • fanout:轉發消息到全部綁定隊列,忽略 routigKey

交換器的屬性:

  • 持久性:若是啓用,交換器將會在server重啓前都有效。
  • 自動刪除:若是啓用,那麼交換器將會在其綁定的隊列都被刪除掉以後自動刪除掉自身。
  • 惰性:若是沒有聲明交換器,那麼在執行到使用的時候會致使異常,並不會主動聲明。

 

若是沒有隊列綁定在交換機上,則發送到該交換機上的消息會丟失。

 

一個交換機能夠綁定多個隊列,一個隊列能夠被多個交換機綁定。

 

topic 類型交換器經過模式匹配分析消息的 routing-key 屬性。它將 routing-key 和 binding-key 的字符串切分紅單詞。這些單詞之間用點隔開。它一樣也會識別兩個通配符:#匹配0個或者多個單詞,*匹配一個單詞。例如,binding key:*.stock.#匹配 routing key:usd.stcok 和 eur.stock.db,可是不匹配 stock.nana。

 

由於交換器是命名實體,聲明一個已經存在的交換器,可是試圖賦予不一樣類型是會致使錯誤。客戶端須要刪除這個已經存在的交換器,而後從新聲明而且賦予新的類型。

 

6. 在 applicationContext.xml 中聲明一個隊列。

<rabbit:queue> 標籤的 name 屬性就是在 RabbitMQ 服務器中 Queue 的 name。

<rabbit:queue> 標籤的 id 屬性是上面 <rabbit:binding> 標籤的 queue 屬性。

<rabbit:queue id="test_queue" name="${mq.queue}_testQueue" durable="true" auto-delete="false" exclusive="false" />
  • durable:是否持久化
  • exclusive:僅建立者可使用的私有隊列,斷開後自動刪除
  • auto-delete:當全部消費端鏈接斷開後,是否自動刪除隊列

 

7. 建立生產者端

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @Description: 消息隊列發送者
 * @Author: 
 * @CreateTime: 
 */
@Service
public class Producer {

    @Autowired
    private AmqpTemplate amqpTemplate;
    
    public void sendQueue(String exchange_key, String queue_key, Object object) {
        // convertAndSend 將Java對象轉換爲消息發送至匹配key的交換機中Exchange
        amqpTemplate.convertAndSend(exchange_key, queue_key, object);
    }
}

 

8. 在 applicationContext.xml 中配置監聽及消費者端

<!-- 消費者 -->
<bean name="rabbitmqService" class="com.enh.mq.RabbitmqService"></bean>
    
<!-- 配置監聽 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
  <!-- 
    queues 監聽隊列,多個用逗號分隔 
    ref 監聽器
  -->
  <rabbit:listener queues="test_queue" ref="rabbitmqService"/>
</rabbit:listener-container>

 

消費者 Java 代碼:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class RabbitmqService implements MessageListener {
    public void onMessage(Message message) {
        System.out.println("消息消費者 = " + message.toString());
    }
}

 

至此,咱們的全部配置文件就寫完了,最終以下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xmlns:context="http://www.springframework.org/schema/context"
     xmlns:util="http://www.springframework.org/schema/util"
     xmlns:aop="http://www.springframework.org/schema/aop"
     xmlns:tx="http://www.springframework.org/schema/tx"
     xmlns:rabbit="http://www.springframework.org/schema/rabbit"
     xmlns:p="http://www.springframework.org/schema/p"
     xsi:schemaLocation="
     http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-3.0.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/util
     http://www.springframework.org/schema/util/spring-util-3.0.xsd
     http://www.springframework.org/schema/aop
     http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
     http://www.springframework.org/schema/tx
     http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
     
    <!-- RabbitMQ start -->
    <!-- 鏈接配置 -->
    <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}"
        password="${mq.password}" port="${mq.port}"  />
        
    <rabbit:admin connection-factory="connectionFactory"/>
    
    <!-- 消息隊列客戶端 -->
    <rabbit:template id="amqpTemplate" exchange="${mq.queue}_exchange" connection-factory="connectionFactory"  />
    
    <!-- queue 隊列聲明 -->
    <!-- 
        durable 是否持久化 
        exclusive 僅建立者可使用的私有隊列,斷開後自動刪除 
        auto-delete 當全部消費端鏈接斷開後,是否自動刪除隊列 -->
    <rabbit:queue id="test_queue" name="${mq.queue}_testQueue" durable="true" auto-delete="false" exclusive="false" />
    
    <!-- 交換機定義 -->
    <!-- 
        交換機:一個交換機能夠綁定多個隊列,一個隊列也能夠綁定到多個交換機上。
        若是沒有隊列綁定到交換機上,則發送到該交換機上的信息則會丟失。
        
        direct模式:消息與一個特定的路由器徹底匹配,纔會轉發
        topic模式:按模式匹配
     -->
    <rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <!-- 設置消息Queue匹配的pattern (direct模式爲key) -->
            <rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    
    <bean name="rabbitmqService" class="com.enh.mq.RabbitmqService"></bean>
    
    <!-- 配置監聽 消費者 -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
        <!-- 
            queues 監聽隊列,多個用逗號分隔 
            ref 監聽器 -->
        <rabbit:listener queues="test_queue" ref="rabbitmqService"/>
    </rabbit:listener-container>
</beans>

 

9. 如何使用 RabbitMQ 發送一個消息

@Autowired
private Producer producer;
@Value("#{appConfig['mq.queue']}")
private String queueId;
    
/**
 * @Description: 消息隊列
 * @Author: 
 * @CreateTime: 
 */
@ResponseBody
@RequestMapping("/sendQueue")
public String testQueue() {
    try {
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("data", "hello rabbitmq");
        // 注意:第二個屬性是 Queue 與 交換機綁定的路由
        producer.sendQueue(queueId + "_exchange", queueId + "_patt", map);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return "發送完畢";
}

 

嗯。這個是 SpringMVC 框架的 Controller 類。

 

3、優化 - 將消費者由類級別改成方法級別。

1. 重寫消費者 Java 代碼

import java.util.Map;

import org.springframework.stereotype.Service;

@Service
public class ConsumerService {

    public void getMessage(Map<String, Object> message) {
        System.out.println("消費者:" + message);
    }
    
}

 

2. 更改 applicationContext 配置文件

將:

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
    <!-- queues 監聽隊列,多個用逗號分隔 ref 監聽器 -->
    <rabbit:listener queues="test_queue" ref="consumerService" />
</rabbit:listener-container>

改成:

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
    <!-- queues 監聽隊列,多個用逗號分隔 ref 監聽器 -->
    <rabbit:listener queues="test_queue" ref="consumerService" method="getMessage" />
</rabbit:listener-container>

 

完畢。

相關文章
相關標籤/搜索