spring中使用RabbitMQ

常見的消息中間件產品:java

(1)ActiveMQspring

ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現。json

(2)RabbitMQ服務器

AMQP協議的領導實現,支持多種場景。淘寶的MySQL集羣內部有使用它進行通信,OpenStack開源雲平臺的通訊組件,最早在金融行業獲得運用。咱們在本次課程中介紹 RabbitMQ的使用。架構

(3)ZeroMQapp

史上最快的消息隊列系統異步

(4)Kafka分佈式

Apache下的一個子項目 。特色:高吞吐,在一臺普通的服務器上既能夠達到10W/s的吞吐速率;徹底的分佈式系統。適合處理海量數據。ide

(5)RocketMQ 阿里巴巴性能

消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息排隊模型,它能夠在分佈式環境下擴展進程間的通訊。對於消息中間件,常見的角色大體也就有Producer(生產者)、Consumer(消費者)。

消息隊列中間件是分佈式系統中重要的組件,主要解決應用解耦,異步消息,流量削鋒等問題,實現高性能,高可用,可伸縮和最終一致性架構。

​ Spring-amqp是對AMQP協議的抽象實現,而spring-rabbit 是對協議的具體實現,也是目前的惟一實現。底層使用的就是RabbitMQ。

已經配置好了ssm的開發環境

1.導入依賴

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.5.3</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>2.1.3.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.9.5</version>
    </dependency>
</dependencies>

2.編寫生產者

2.1配置文件

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
	http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
	http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd">

    <context:component-scan base-package="cn.test.rabbitmq.spring"/>

<!-- 配置鏈接工廠 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/saas"
                           host="127.0.0.1" port="5672" username="saas" password="saas" />
<!-- 定義mq管理 -->
<rabbit:admin connection-factory="connectionFactory" />

<!-- 聲明隊列 -->
<rabbit:queue name="spring.test.queue" auto-declare="true" durable="true" />

<!-- 定義交換機綁定隊列(路由模式) -->
<rabbit:direct-exchange  name="spring.test.exchange">
    <rabbit:bindings>
        <rabbit:binding queue="spring.test.queue" key="user.insert" />
    </rabbit:bindings>
</rabbit:direct-exchange>
<!-- 定義交換機綁定隊列(路由模式)使用匹配符
<rabbit:topic-exchange  id="springTestExchange" name="spring.test.exchange">
    <rabbit:bindings>
        <rabbit:binding queue="spring.test.queue" pattern="#.#" />
    </rabbit:bindings>
</rabbit:topic-exchange>
-->
<!-- 消息對象json轉換類 -->
<bean id="jsonMessageConverter"
      class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

<!-- 定義模版 -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
                 exchange="spring.test.exchange"
                 message-converter="jsonMessageConverter"/>

</beans>

2.2 發送方代碼

這裏是往RabbitMQ隊列中放入任務,讓消費者去取

package cn.test.rabbitmq.spring;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MqSender {
    @Autowired
    private AmqpTemplate amqpTemplate;
    public void sendMessage(){
        //根據key發送到對應的隊列
        amqpTemplate.convertAndSend("user.insert","spring整合RabbitMQ消息");
        System.out.println("發送成功........");
    }
}

2.3 測試代碼

package cn.test.rabbitmq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-mq-send.xml")
public class MqSendDemo {

    @Autowired
    private MqSender mqSender;
    @Test
    public void test(){
        //根據key發送到對應的隊列
        mqSender.sendMessage();
    }
}

3.編寫消費者

3.1 配置文件

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
	http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
	http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans.xsd">


    <!-- 配置鏈接工廠 -->
    <rabbit:connection-factory id="connectionFactory" virtual-host="/saas"
                               host="127.0.0.1" port="5672" username="saas" password="saas" />
    <!-- 定義mq管理 -->
    <rabbit:admin connection-factory="connectionFactory" />

    <!-- 聲明隊列 -->
    <rabbit:queue  name="spring.test.queue" auto-declare="true" durable="true" />

    <!-- 定義消費者 -->
    <bean id="testMqListener" class="cn.test.rabbitmq.spring.MqListener" />

    <!-- 定義消費者監聽隊列 -->
    <rabbit:listener-container
            connection-factory="connectionFactory">
        <rabbit:listener ref="testMqListener" queues="spring.test.queue" />
    </rabbit:listener-container>

</beans>

3.2 監聽代碼

package cn.test.rabbitmq.spring;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

public class MqListener implements MessageListener {

    public void onMessage(Message message) {
        try {
            System.out.println(message.getBody());
            String ms = new String(message.getBody(), "UTF-8");
            System.out.println(ms);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3.3 測試代碼

package cn.itcast.rabbitmq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-mq-receive.xml")
public class MqReceiveDemo {

    @Test
    public void test(){
      //等待隊列中放入任務,若是有任務,當即消費任務
        while (true){
        }
    }
}
相關文章
相關標籤/搜索