spring你們太熟,就很少說了java
rabbitmq一個amqp的隊列服務實現,具體介紹請參考本文http://lynnkong.iteye.com/blog/1699684 spring
本文側重介紹如何將rabbitmq整合到項目中apache
ps:本文只是簡單一個整合介紹,屬於拋磚引玉,具體實現還需你們深刻研究哈..json
1.首先是生產者配置緩存
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!-- 鏈接服務配置 --> <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" port="5672" /> <rabbit:admin connection-factory="connectionFactory"/> <!-- queue 隊列聲明--> <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/> <!-- exchange queue binging key 綁定 --> <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange"> <rabbit:bindings> <rabbit:binding queue="queue_one" key="queue_one_key"/> </rabbit:bindings> </rabbit:direct-exchange> <-- spring amqp默認的是jackson 的一個插件,目的將生產者生產的數據轉換爲json存入消息隊列,因爲fastjson的速度快於jackson,這裏替換爲fastjson的一個實現 --> <bean id="jsonMessageConverter" class="mq.convert.FastJsonMessageConverter"></bean> <-- spring template聲明--> <rabbit:template exchange="my-mq-exchange" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter"/> </beans>
2.fastjson messageconver插件實現ide
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.AbstractMessageConverter; import org.springframework.amqp.support.converter.MessageConversionException; import fe.json.FastJson; public class FastJsonMessageConverter extends AbstractMessageConverter { private static Log log = LogFactory.getLog(FastJsonMessageConverter.class); public static final String DEFAULT_CHARSET = "UTF-8"; private volatile String defaultCharset = DEFAULT_CHARSET; public FastJsonMessageConverter() { super(); //init(); } public void setDefaultCharset(String defaultCharset) { this.defaultCharset = (defaultCharset != null) ? defaultCharset : DEFAULT_CHARSET; } public Object fromMessage(Message message) throws MessageConversionException { return null; } public <T> T fromMessage(Message message,T t) { String json = ""; try { json = new String(message.getBody(),defaultCharset); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return (T) FastJson.fromJson(json, t.getClass()); } protected Message createMessage(Object objectToConvert, MessageProperties messageProperties) throws MessageConversionException { byte[] bytes = null; try { String jsonString = FastJson.toJson(objectToConvert); bytes = jsonString.getBytes(this.defaultCharset); } catch (UnsupportedEncodingException e) { throw new MessageConversionException( "Failed to convert Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); messageProperties.setContentEncoding(this.defaultCharset); if (bytes != null) { messageProperties.setContentLength(bytes.length); } return new Message(bytes, messageProperties); } }
3.生產者端調用this
import java.util.List; import org.springframework.amqp.core.AmqpTemplate; public class MyMqGatway { @Autowired private AmqpTemplate amqpTemplate; public void sendDataToCrQueue(Object obj) { amqpTemplate.convertAndSend("queue_one_key", obj); } }
4.消費者端配置(與生產者端大同小異)插件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!-- 鏈接服務配置 --> <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" port="5672" /> <rabbit:admin connection-factory="connectionFactory"/> <!-- queue 隊列聲明--> <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/> <!-- exchange queue binging key 綁定 --> <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange"> <rabbit:bindings> <rabbit:binding queue="queue_one" key="queue_one_key"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor"> <rabbit:listener queues="queue_one" ref="queueOneLitener"/> </rabbit:listener-container> </beans>
5.消費者端調用線程
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class QueueOneLitener implements MessageListener{ @Override public void onMessage(Message message) { System.out.println(" data :" + message.getBody()); } }
6.因爲消費端當隊列有數據到達時,對應監聽的對象就會被通知到,沒法作到批量獲取,批量入庫,所以能夠在消費端緩存一個臨時隊列,將mq取出來的數據存入本地隊列,後臺線程定時批量處理便可code
打個廣告:創業項目www.1zheke.com,有培訓訴求的人能夠支持一下,或者周圍有培訓需求的人,幫忙擴散一下,限量1折課,獎學拿到手軟,很是感謝