發送者 spring配置文件web
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd"> <!-- 鏈接服務配置 --> <rabbit:connection-factory id="connectionFactory" addresses="${mq.addresses}" username="${mq.username}" password="${mq.password}"/> <rabbit:admin connection-factory="connectionFactory"/> <!-- queue 隊列聲明--> <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/> <rabbit:queue id="queue_topic" durable="true" auto-delete="false" exclusive="false" name="queue_topic"/> <!-- exchange queue binging key 綁定,做爲點對點模式使用 --> <rabbit:direct-exchange name="direct-exchange" durable="true" auto-delete="false" id="exchange-redirect"> <rabbit:bindings> <rabbit:binding queue="queue_one" key="queue_one_key"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- fanout-exchange,做爲發佈-訂閱模式使用。 因爲RabbitMQ的發佈訂閱模型是根據多個queue,多個消費者訂閱實現的。此處聲明的exchange沒必要預先綁定queue, 在消費者聲明queue並綁定到該exchange便可。 --> <rabbit:fanout-exchange name="fanout-exchange" durable="true" auto-delete="false" id="fanout-exchange"> </rabbit:fanout-exchange> <!-- topic-exchange,做爲主題模式使用。 匹配routingkey的模式,這裏匹配兩個queue queue_topic準備匹配key爲zhu.q1 queue_topic2準備匹配key爲zhu.q2 --> <rabbit:topic-exchange name="topic-exchange"> <rabbit:bindings> <rabbit:binding queue="queue_topic" pattern="zhu.*" /> <rabbit:binding queue="queue_topic2" pattern="zhu.*" /> </rabbit:bindings> </rabbit:topic-exchange> <!-- spring amqp默認的是jackson 的一個插件,目的將生產者生產的數據轉換爲json存入消息隊列,因爲fastjson的速度快於jackson,這裏替換爲fastjson的一個實現 --> <bean id="jsonMessageConverter" class="uap.web.mq.rabbit.FastJsonMessageConverter"></bean> <!-- <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean> --> <!-- spring template聲明 (點對點) --> <rabbit:template exchange="direct-exchange" id="amqpTemplate" retry-template="retryConnTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter"/> <!-- spring template聲明(發佈,訂閱) --> <rabbit:template exchange="fanout-exchange" id="fanoutTemplate" retry-template="retryConnTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter"/> <!-- 通用 template聲明 --> <rabbit:template id="rabbitTemplate" retry-template="retryConnTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter"/> <!-- 增長失敗重試機制,發送失敗以後,會嘗試重發三次,重發間隔(ms)爲 第一次 initialInterval 此後:initialInterval*multiplier > maxInterval ? maxInterval : initialInterval*multiplier。 配合集羣使用的時候,當mq集羣中一個down掉以後,重試機制嘗試其餘可用的mq。 --> <bean id="retryConnTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="500"/> <property name="multiplier" value="10.0"/> <property name="maxInterval" value="5000"/> </bean> </property> </bean> </beans>
消息發送代碼spring
package com.zhu.mq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; public class RabbitMqSender { @Autowired private AmqpTemplate amqpTemplate; @Autowired private AmqpTemplate fanoutTemplate; @Autowired private AmqpTemplate rabbitTemplate; /** * 點對點 */ public void sendDataToCrQueue(Object obj) { amqpTemplate.convertAndSend("queue_one_key", obj); } /** * 發送 發佈--訂閱消息 */ public void sendFanoutMsg(Object obj) { fanoutTemplate.convertAndSend(obj); } /** * 主題 */ public void sendTopicMsg(String topic,Object obj) { rabbitTemplate.convertAndSend(topic,obj); } }
發送消息json
RabbitMqSender sender = new RabbitMqSender(); String str="test1"; Object obj = (Object)str; sender.sendDataToCrQueue(obj); sender.sendFanoutMsg(obj); sender.sendTopicMsg("zhu.p1",obj); sender.sendTopicMsg("zhu.p2",obj);
接受者ide
spring配置文件插件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd"> <!-- 鏈接服務配置 --> <rabbit:connection-factory id="connectionFactory" addresses="${mq.addresses}" username="${mq.username}" password="${mq.password}"/> <rabbit:admin connection-factory="connectionFactory"/> <!-- queue 隊列聲明(direct) --> <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/> <!-- 聲明的訂閱模型的queue(fanout) --> <rabbit:queue id="fanout-subscribe" durable="true" auto-delete="false" exclusive="false" name="fanout-subscribe"/> <rabbit:queue id="fanout-subscribe2" durable="true" auto-delete="false" exclusive="false" name="fanout-subscribe2"/> <!-- 聲明的訂閱模型的queue(topic) --> <rabbit:queue id="queue_topic" durable="true" auto-delete="false" exclusive="false" name="queue_topic"/> <rabbit:queue id="queue_topic2" durable="true" auto-delete="false" exclusive="false" name="queue_topic2"/> <!-- 將訂閱的queue綁定到fanout-exchange上 --> <rabbit:fanout-exchange name="fanout-exchange" durable="true" auto-delete="false" id="fanout-exchange"> <rabbit:bindings> <rabbit:binding queue="fanout-subscribe"></rabbit:binding> <rabbit:binding queue="fanout-subscribe2"></rabbit:binding> </rabbit:bindings> </rabbit:fanout-exchange> <!-- 定義queue監聽器 --> <bean id="DirectListener" class="com.zhu.mq.DirectListener"></bean> <bean id="FanoutLitener" class="com.zhu.mq.FanoutLitener"></bean> <bean id="TopicLitener" class="com.zhu.mq.TopicLitener"></bean> <!-- direct監聽器綁定--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <rabbit:listener queues="queue_one" ref="DirectListener"/> </rabbit:listener-container> <!-- fanout監聽器綁定 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <rabbit:listener queues="fanout-subscribe" ref="FanoutLitener"/> </rabbit:listener-container> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <rabbit:listener queues="fanout-subscribe2" ref="FanoutLitener"/> </rabbit:listener-container> <!-- topic監聽器綁定--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <rabbit:listener queues="queue_topic" ref="TopicLitener"/> </rabbit:listener-container> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <rabbit:listener queues="queue_topic2" ref="TopicLitener"/> </rabbit:listener-container> </beans>
監聽器代碼 direct監聽器code
package com.zhu.mq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class DirectListener implements MessageListener { private static Logger logger = LoggerFactory.getLogger(DirectListener.class); @Override public void onMessage(Message message) { logger.info("Direct queue data:" + new String(message.getBody())); } }
fanout監聽器xml
package com.zhu.mq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class FanoutLitener implements MessageListener { private static Logger logger = LoggerFactory.getLogger(FanoutLitener.class); @Override public void onMessage(Message message) { logger.info("Fanout queue data:" + new String(message.getBody())); } }
topic監聽器隊列
package com.zhu.mq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class TopicLitener implements MessageListener { private static Logger logger = LoggerFactory.getLogger(TopicLitener.class); @Override public void onMessage(Message message) { logger.info("Topic queue data:" + new String(message.getBody())); } }