Spring整合rabbitmq

發送者 spring配置文件web

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context" 
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans.xsd
	http://www.springframework.org/schema/context
	http://www.springframework.org/schema/context/spring-context.xsd
	http://www.springframework.org/schema/rabbit
	http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">

	<!-- 鏈接服務配置  -->
	<rabbit:connection-factory id="connectionFactory" addresses="${mq.addresses}"  username="${mq.username}" password="${mq.password}"/>

	<rabbit:admin connection-factory="connectionFactory"/>

	<!-- queue 隊列聲明-->
	<rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
	<rabbit:queue id="queue_topic" durable="true" auto-delete="false" exclusive="false" name="queue_topic"/>

	<!-- exchange queue binging key 綁定,做爲點對點模式使用 -->
	<rabbit:direct-exchange name="direct-exchange" durable="true" auto-delete="false" id="exchange-redirect">
		<rabbit:bindings>
			<rabbit:binding queue="queue_one" key="queue_one_key"/>
		</rabbit:bindings>
	</rabbit:direct-exchange>

	<!-- fanout-exchange,做爲發佈-訂閱模式使用。
		因爲RabbitMQ的發佈訂閱模型是根據多個queue,多個消費者訂閱實現的。此處聲明的exchange沒必要預先綁定queue,
		在消費者聲明queue並綁定到該exchange便可。
	 -->
	<rabbit:fanout-exchange name="fanout-exchange" durable="true" auto-delete="false" id="fanout-exchange">
    </rabbit:fanout-exchange>

	<!-- topic-exchange,做爲主題模式使用。
		匹配routingkey的模式,這裏匹配兩個queue
		queue_topic準備匹配key爲zhu.q1
		queue_topic2準備匹配key爲zhu.q2
	 -->
	<rabbit:topic-exchange name="topic-exchange">  
		<rabbit:bindings>  
			<rabbit:binding queue="queue_topic" pattern="zhu.*" />  
			<rabbit:binding queue="queue_topic2" pattern="zhu.*" />  
		</rabbit:bindings>  
	</rabbit:topic-exchange>  

    <!-- spring amqp默認的是jackson 的一個插件,目的將生產者生產的數據轉換爲json存入消息隊列,因爲fastjson的速度快於jackson,這裏替換爲fastjson的一個實現 -->
    <bean id="jsonMessageConverter" class="uap.web.mq.rabbit.FastJsonMessageConverter"></bean>
    <!--
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>
    -->

	<!-- spring template聲明 (點對點) -->
	<rabbit:template exchange="direct-exchange" id="amqpTemplate" retry-template="retryConnTemplate" connection-factory="connectionFactory"  message-converter="jsonMessageConverter"/>

	<!-- spring template聲明(發佈,訂閱) -->
	<rabbit:template exchange="fanout-exchange" id="fanoutTemplate"  retry-template="retryConnTemplate"  connection-factory="connectionFactory"  message-converter="jsonMessageConverter"/>

	<!-- 通用 template聲明 -->
	<rabbit:template id="rabbitTemplate"  retry-template="retryConnTemplate"  connection-factory="connectionFactory"  message-converter="jsonMessageConverter"/>

	<!-- 增長失敗重試機制,發送失敗以後,會嘗試重發三次,重發間隔(ms)爲 
	    第一次 initialInterval 
	    此後:initialInterval*multiplier > maxInterval ? maxInterval : initialInterval*multiplier。
	    配合集羣使用的時候,當mq集羣中一個down掉以後,重試機制嘗試其餘可用的mq。
	 -->
	<bean id="retryConnTemplate" class="org.springframework.retry.support.RetryTemplate">
        <property name="backOffPolicy">
            <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                <property name="initialInterval" value="500"/>
                <property name="multiplier" value="10.0"/>
                <property name="maxInterval" value="5000"/>
            </bean>
        </property>
    </bean>
</beans>

消息發送代碼spring

package com.zhu.mq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;

public class RabbitMqSender {

	@Autowired
	private AmqpTemplate amqpTemplate;

	@Autowired
	private AmqpTemplate fanoutTemplate;

	@Autowired
	private AmqpTemplate rabbitTemplate;

	/**
	 * 點對點
	 */
	public void sendDataToCrQueue(Object obj) {
		amqpTemplate.convertAndSend("queue_one_key", obj);
	}

	/**
	 * 發送 發佈--訂閱消息
	 */
	public void sendFanoutMsg(Object obj) {
		fanoutTemplate.convertAndSend(obj);
	}

	/**
	 * 主題
	 */
	public void sendTopicMsg(String topic,Object obj) {
		rabbitTemplate.convertAndSend(topic,obj);
	}

}

發送消息json

RabbitMqSender sender = new RabbitMqSender();
String str="test1";
Object obj = (Object)str;
sender.sendDataToCrQueue(obj);
sender.sendFanoutMsg(obj);
sender.sendTopicMsg("zhu.p1",obj);
sender.sendTopicMsg("zhu.p2",obj);

接受者ide

spring配置文件插件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context" 
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans.xsd
	http://www.springframework.org/schema/context
	http://www.springframework.org/schema/context/spring-context.xsd
	http://www.springframework.org/schema/rabbit
	http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">

	<!-- 鏈接服務配置  -->
	<rabbit:connection-factory id="connectionFactory" addresses="${mq.addresses}"  username="${mq.username}" password="${mq.password}"/>

	<rabbit:admin connection-factory="connectionFactory"/>

	<!-- queue 隊列聲明(direct)  -->
	<rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
	<!-- 聲明的訂閱模型的queue(fanout) -->
	<rabbit:queue id="fanout-subscribe" durable="true" auto-delete="false" exclusive="false" name="fanout-subscribe"/>
	<rabbit:queue id="fanout-subscribe2" durable="true" auto-delete="false" exclusive="false" name="fanout-subscribe2"/>
	<!-- 聲明的訂閱模型的queue(topic) -->
	<rabbit:queue id="queue_topic" durable="true" auto-delete="false" exclusive="false" name="queue_topic"/>
	<rabbit:queue id="queue_topic2" durable="true" auto-delete="false" exclusive="false" name="queue_topic2"/>

    <!-- 將訂閱的queue綁定到fanout-exchange上 -->
    <rabbit:fanout-exchange name="fanout-exchange" durable="true" auto-delete="false" id="fanout-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="fanout-subscribe"></rabbit:binding>
            <rabbit:binding queue="fanout-subscribe2"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <!-- 定義queue監聽器 -->
	<bean id="DirectListener" class="com.zhu.mq.DirectListener"></bean>
	<bean id="FanoutLitener" class="com.zhu.mq.FanoutLitener"></bean>
	<bean id="TopicLitener" class="com.zhu.mq.TopicLitener"></bean>

    <!-- direct監聽器綁定-->
	<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
		<rabbit:listener queues="queue_one" ref="DirectListener"/>
	</rabbit:listener-container>

    <!-- fanout監聽器綁定 -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
		<rabbit:listener queues="fanout-subscribe" ref="FanoutLitener"/>
	</rabbit:listener-container>

	<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
		<rabbit:listener queues="fanout-subscribe2" ref="FanoutLitener"/>
	</rabbit:listener-container>

	<!-- topic監聽器綁定-->
	<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
		<rabbit:listener queues="queue_topic" ref="TopicLitener"/>
	</rabbit:listener-container>

	<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
		<rabbit:listener queues="queue_topic2" ref="TopicLitener"/>
	</rabbit:listener-container>

</beans>

監聽器代碼 direct監聽器code

package com.zhu.mq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class DirectListener implements MessageListener {

	private static Logger logger = LoggerFactory.getLogger(DirectListener.class);

	@Override
	public void onMessage(Message message) {
		logger.info("Direct queue data:" + new String(message.getBody()));
	}
}

fanout監聽器xml

package com.zhu.mq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class FanoutLitener implements MessageListener {

	private static Logger logger = LoggerFactory.getLogger(FanoutLitener.class);

	@Override
	public void onMessage(Message message) {
		logger.info("Fanout queue data:" + new String(message.getBody()));
	}
}

topic監聽器隊列

package com.zhu.mq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class TopicLitener implements MessageListener {

	private static Logger logger = LoggerFactory.getLogger(TopicLitener.class);

	@Override
	public void onMessage(Message message) {
		logger.info("Topic queue data:" + new String(message.getBody()));
	}
}
相關文章
相關標籤/搜索