經過maven方式,應用activemq依賴包,pom.xml 添加以下信息,java
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency>
配置server.properties文件以下,spring
#activity config activity.mq.brokerURL=failover:(tcp://127.0.0.1:61616)
配置相關bean,包括監聽,消息發送,以及broker,queue/topic ,以下spring-activemq.xml數據庫
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd"> <context:annotation-config/> <!-- 配置JMS鏈接工廠 --> <bean id="providerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${activity.mq.brokerURL}" /> <property name="useAsyncSend" value="true" /> <property name="clientID" value="providerClientConnect" /> <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"/> </bean> <amq:redeliveryPolicy id="activeMQRedeliveryPolicy" destination="#queueDestination" maximumRedeliveries="10"/> <!-- 定義消息Destination --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="SpringTopic"/> </bean> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="signIncomeQueue"/> </bean> <!-- 消息發送者客戶端 --> <bean id="providerJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="providerConnectionFactory" /> <!--<property name="defaultDestination" ref="topicDestination" />--> <property name="defaultDestination" ref="queueDestination" /> <!-- 開啓訂閱模式 --> <property name="pubSubDomain" value="false"/> <!--<property name="receiveTimeout" value="10000" />--> <!-- deliveryMode, priority, timeToLive 的開關要生效,必須配置爲true,默認false--> <property name="explicitQosEnabled" value="true"/> <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"/> <!-- 發送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久 --> <property name="deliveryMode" value="2"/> </bean> <!-- 配置消息消費監聽者 --> <bean id="consumerMessageListener" class="com.company.project.service.mq.ConsumerMessageListener" /> <bean id="consumerListenerClient" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="providerConnectionFactory" /> <property name="concurrentConsumers" value="10"/> <!--<property name="concurrency" value="10-20"/>--> <!-- 開啓訂閱模式 --> <property name="pubSubDomain" value="true"/> <!--<property name="destination" ref="topicDestination" />--> <property name="destination" ref="topicDestination" /> <property name="subscriptionDurable" value="true"/> <!---這裏是設置接收客戶端的ID,在持久化時,但這個客戶端不在線時,消息就存在數據庫裏,直到被這個ID的客戶端消費掉--> <property name="clientId" value="consumerClient"/> <property name="messageListener" ref="consumerMessageListener" /> <!-- 消息應答方式 Session.AUTO_ACKNOWLEDGE 消息自動簽收 Session.CLIENT_ACKNOWLEDGE 客戶端調用acknowledge方法手動簽收 Session.DUPS_OK_ACKNOWLEDGE 沒必要必須簽收,消息可能會重複發送 --> <property name="sessionAcknowledgeMode" value="2"/> </bean> </beans>
發送時,java最經常使用的有兩種格式,textMessage和mapMessage,apache
package com.company.project.service.impl; import com.alibaba.fastjson.JSON; import com.hisense.hitv.service.IQueueService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; import javax.jms.*; import java.util.HashMap; import java.util.Map; @Service public class QueueServiceImpl implements IQueueService { @Autowired private JmsTemplate jmsTemplate; public boolean pushMessage2QueueIncome(String uid, Integer incomeType, Integer incomeValue, Integer productCode) { sendMqMessageIncome(null, uid, incomeType, incomeValue, productCode); return false; } /** * 說明:發送的時候若是這裏沒有顯示的指定destination.將用spring xml中配置的destination * @param destination */ private void sendMqMessageIncome(Destination destination, final String uid,final Integer incomeType, final Integer incomeValue,final Integer productCode){ if(null == destination){ destination = jmsTemplate.getDefaultDestination(); } jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { Map param = new HashMap(); MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("customerId", uid); param.put("customerId", uid); mapMessage.setInt("incomeType", incomeType); param.put("incomeType", incomeType); mapMessage.setInt("incomeValue", incomeValue); param.put("incomeValue", incomeValue); mapMessage.setInt("productCode",productCode); param.put("productCode", productCode); TextMessage message= session.createTextMessage(); message.setText(JSON.toJSONString(param)); return message; } }); } }
監聽經過onMessage接口實現,監聽broker推送消息json
package com.company.project.service.mq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.listener.SessionAwareMessageListener; import javax.jms.*; public class ConsumerMessageListener implements SessionAwareMessageListener{ private static Logger logger = LoggerFactory.getLogger(ConsumerMessageListener.class); public void onMessage(Message message, Session session) throws JMSException { MapMessage tm = (MapMessage) message; try { logger.info("---------消息消費---------"); logger.info("消息ID:\t" + tm.getJMSMessageID()); } catch (JMSException e) { session.recover();//喚起重傳 e.printStackTrace(); } } }