<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- 鏈接服務配置 若是MQ服務器在遠程服務器上,請新建用戶用新建的用戶名密碼 guest默認不容許遠程登陸-->
<rabbit:connection-factory id="connectionFactory"
host="localhost" username="guest" password="guest" port="5672"
virtual-host="/" channel-cache-size="5" />
<!-- 配置愛admin,自動根據配置文件生成交換器和隊列,無需手動配置 -->
<rabbit:admin connection-factory="connectionFactory" />
<!-- queue 隊列聲明 -->
<rabbit:queue durable="true"
auto-delete="false" exclusive="false" name="spring.queue.tag" />
<!-- exchange queue binging key 綁定 -->
<rabbit:direct-exchange name="spring.queue.exchange"
durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- spring amqp默認的是jackson 的一個插件,目的將生產者生產的數據轉換爲json存入消息隊列,因爲Gson的速度快於jackson,這裏替換爲Gson的一個實現 -->
<bean id="jsonMessageConverter" class="sendMQ.Gson2JsonMessageConverter" />
<!-- spring template聲明 -->
<rabbit:template id="amqpTemplate" exchange="spring.queue.exchange" routing-key="spring.queue.tag.key"
connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
發送端代碼:GSON配置java
package sendMQ; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.AbstractJsonMessageConverter; import org.springframework.amqp.support.converter.ClassMapper; import org.springframework.amqp.support.converter.DefaultClassMapper; import org.springframework.amqp.support.converter.MessageConversionException; import com.google.gson.Gson; public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter{ private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class); private static ClassMapper classMapper = new DefaultClassMapper(); private static Gson gson = new Gson(); public Gson2JsonMessageConverter() { super(); } @Override protected Message createMessage(Object object, MessageProperties messageProperties) { byte[] bytes = null; try { String jsonString = gson.toJson(object); bytes = jsonString.getBytes(getDefaultCharset()); } catch (IOException e) { throw new MessageConversionException( "Failed to convert Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); messageProperties.setContentEncoding(getDefaultCharset()); if (bytes != null) { messageProperties.setContentLength(bytes.length); } classMapper.fromClass(object.getClass(),messageProperties); return new Message(bytes, messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { Object content = null; MessageProperties properties = message.getMessageProperties(); if (properties != null) { String contentType = properties.getContentType(); if (contentType != null && contentType.contains("json")) { String encoding = properties.getContentEncoding(); if (encoding == null) { encoding = getDefaultCharset(); } try { Class<?> targetClass = getClassMapper().toClass( message.getMessageProperties()); content = convertBytesToObject(message.getBody(), encoding, targetClass); } catch (IOException e) { throw new MessageConversionException( "Failed to convert Message content", e); } } else { log.warn("Could not convert incoming message with content-type [" + contentType + "]"); } } if (content == null) { content = message.getBody(); } return content; } private Object convertBytesToObject(byte[] body, String encoding, Class<?> clazz) throws UnsupportedEncodingException { String contentAsString = new String(body, encoding); return gson.fromJson(contentAsString, clazz); } }
發送類接口:spring
public interface MQProducer { /** * 發送消息到指定隊列 * @param queueKey * @param object */ public void sendDataToQueue(String queueKey, Object object); }
實現類:test是測試用的。apache
package sendMQ; import java.util.HashMap; import java.util.Map; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.stereotype.Component; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(value = SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "classpath:/spring-common.xml"}) @Component public class MQProducerImpl implements MQProducer { @Autowired private AmqpTemplate amqpTemplate; @Override public void sendDataToQueue(String queueKey, Object object) { System.out.println("--"+amqpTemplate); try { amqpTemplate.convertAndSend(object); System.out.println("------------消息發送成功"); } catch (Exception e) { System.out.println(e); } } @Test public void test() { Map<String,Object> msg = new HashMap<>(); msg.put("data","hello,456"); while(true){ amqpTemplate.convertAndSend(msg); try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO 自動生成的 catch 塊 e.printStackTrace(); } } } }
接收端配置:json
<!-- 鏈接服務配置 --> <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" port="5672" virtual-host="/" channel-cache-size="5" /> <rabbit:admin connection-factory="connectionFactory"/> <!-- queue 隊列聲明--> <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="spring.queue.tag"/> <!-- exchange queue binging key 綁定 --> <rabbit:direct-exchange name="spring.queue.exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key"/> </rabbit:bindings> </rabbit:direct-exchange> <bean id="receiveMessageListener" class="receiveMQ.QueueListenter" /> <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" > <rabbit:listener queues="spring.queue.tag" ref="receiveMessageListener" /> </rabbit:listener-container>
接收端代碼:服務器
package receiveMQ; import java.io.UnsupportedEncodingException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class QueueListenter implements MessageListener{ @Override public void onMessage(Message msg) { try { System.out.print("-------------------"+new String(msg.getBody(),"UTF-8")); } catch (UnsupportedEncodingException e) { // TODO 自動生成的 catch 塊 e.printStackTrace(); } } }
接收端測試啓動:app
package receiveMQ; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ConsumerMain { public static void main(String[] args) { new ClassPathXmlApplicationContext("spring-common.xml"); } }
上面代碼均有註釋,應該不難看懂,複製便可使用,實現了MQ的簡單功能。負載均衡
說明:能夠配置多個接收端,spring默認的是負載均衡機制,每一個接收端接收一條的來,這些擴展功能待後面有時間再講解ide