沒有找到一篇完整的文章介紹Spring如何整合Rabbitmq應用,琢磨一天搞出的一個入門的demo與夥伴們分享. html
第一步linux環境下安裝rabbitMQ,小弟用的是ubantu,不想浪費太多時間這種安裝上 java
sudo apt-get install rabbitmq-server linux
默認啓動端口5672 測試帳戶guest 密碼guest web
官方經常使用命令,想更改用戶的能夠參考,這裏不做詳細說明http://www.rabbitmq.com/man/rabbitmqctl.1.man.html spring
也能夠省去安裝115.28.141.137我的私服已配置好 json
第二步添加依賴包 app
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.3.5.RELEASE</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency>
第三步配置生產者工程服務配置 測試
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/integration/ip http://www.springframework.org/schema/integration/ip/spring-integration-ip-2.0.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd "> <!-- 引入jdbc配置文件 --> <context:property-placeholder location="classpath:conf/jdbc.properties" /> <!-- 掃描文件(自動將servicec層注入) --> <context:component-scan base-package="mq.service" /> <!-- 鏈接服務配置 --> <rabbit:connection-factory id="connectionFactory" host="115.28.141.137" username="guest" password="guest" port="5672" /> <rabbit:admin connection-factory="connectionFactory" /> <!-- queue 隊列聲明 --> <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one" /> <!-- exchange queue binging key 綁定 --> <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange"> <rabbit:bindings> <rabbit:binding queue="queue_one" key="queue_one_key" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- spring amqp默認的是jackson 的一個插件,目的將生產者生產的數據轉換爲json存入消息隊列,因爲fastjson的速度快於jackson,這裏替換爲fastjson的一個實現 --> <bean id="jsonMessageConverter" class="mq.util.FastJsonMessageConverter"></bean> <!-- spring template聲明--> <rabbit:template exchange="my-mq-exchange" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /> </beans>
FastJsonMessageConverter 類代碼以下 ui
package mq.util; import java.io.UnsupportedEncodingException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.AbstractMessageConverter; import org.springframework.amqp.support.converter.MessageConversionException; import com.alibaba.druid.support.logging.Log; import com.alibaba.druid.support.logging.LogFactory; import com.alibaba.fastjson.JSON; public class FastJsonMessageConverter extends AbstractMessageConverter { private static Log log = LogFactory.getLog(FastJsonMessageConverter.class); public static final String DEFAULT_CHARSET = "UTF-8"; private volatile String defaultCharset = DEFAULT_CHARSET; public FastJsonMessageConverter() { super(); //init(); } public void setDefaultCharset(String defaultCharset) { this.defaultCharset = (defaultCharset != null) ? defaultCharset : DEFAULT_CHARSET; } public Object fromMessage(Message message) throws MessageConversionException { return null; } @SuppressWarnings("unchecked") public <T> T fromMessage(Message message,T t) { String json = ""; try { json = new String(message.getBody(),"UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return (T) JSON.parseObject(json, t.getClass()); } protected Message createMessage(Object objectToConvert, MessageProperties messageProperties) throws MessageConversionException { byte[] bytes = null; try { String jsonString = JSON.toJSONString(objectToConvert); bytes = jsonString.getBytes(this.defaultCharset); } catch (UnsupportedEncodingException e) { throw new MessageConversionException( "Failed to convert Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); messageProperties.setContentEncoding(this.defaultCharset); if (bytes != null) { messageProperties.setContentLength(bytes.length); } return new Message(bytes, messageProperties); } }
生產者代碼Producer this
package mq.service;
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class ProducerMq { @Autowired private AmqpTemplate amqpTemplate; public void sendDataToCrQueue(Object obj) { amqpTemplate.convertAndSend("queue_one_key", obj); } }
第四步配置消費者服務工程配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/integration/ip http://www.springframework.org/schema/integration/ip/spring-integration-ip-2.0.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd "> <!-- 引入jdbc配置文件 --> <context:property-placeholder location="classpath:conf/jdbc.properties" /> <!-- 掃描文件(自動將servicec層注入) --> <context:component-scan base-package="mq.service" /> <task:executor id="taskExecutor" pool-size="1-4" queue-capacity="128" /> <!-- 鏈接服務配置 --> <rabbit:connection-factory id="connectionFactory" host="115.28.141.137" username="guest" password="guest" port="5672" /> <rabbit:admin connection-factory="connectionFactory"/> <!-- queue 隊列聲明--> <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/> <!-- exchange queue binging key 綁定 --> <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange"> <rabbit:bindings> <rabbit:binding queue="queue_one" key="queue_one_key"/> </rabbit:bindings> </rabbit:direct-exchange> <bean id="queueOneLitener" class="mq.service.QueueOneLitener" /> <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor"> <rabbit:listener queues="queue_one" ref="queueOneLitener"/> </rabbit:listener-container> </beans>
監聽器代碼queueOneLitener
package mq.service; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class QueueOneLitener implements MessageListener{ public void onMessage(Message message) { System.out.println(" data :" +new String(message.getBody())); } }
第五步測試
消費者端控制層生產數據
控制層Controller
package mq.controller; import javax.annotation.Resource; import mq.service.ProducerMq; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; @Controller public class MessageController { @Resource private ProducerMq producer; @RequestMapping("/producer") public String producer() throws Exception { for(int i=0;i<100;i++){ producer.sendDataToCrQueue("data"+i); } return "index"; } }
運行結果