點我官網下載
javascript
解壓下載的壓縮文件到任意目錄中(
eg. C:\Program Files (x86)\apache-activemq-5.14.5
),進入%ACTIVEMQ_HOME%/bin目錄,根據本身的系統位數,進入32/64目錄,點擊activemq.bat
啓動ActiveMQ;html
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.2.5.RELEASE</version>
<!--<version>{spring.version}</version>-->
</dependency>
<!-- xbean 如<amq:connectionFactory /> -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.12.1</version>
</dependency>複製代碼
在配置文件中加入如下配置信息,每一個配置信息都有具體的解釋:java
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<!--配置鏈接ActiveMQ的鏈接基本信息 -->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
<!-- 配置JMS鏈接工廠 -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
<property name="sessionCacheSize" value="100" />
</bean>
<!-- 定義消息隊列(Queue) -->
<bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 設置消息隊列的名字 -->
<constructor-arg>
<value>testQueue</value>
</constructor-arg>
</bean>
<!-- 配置JMS模板(Queue),Spring提供的JMS工具類,它發送、接收消息。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="demoQueueDestination" />
<property name="receiveTimeout" value="10000" />
<!-- true是topic,false是queue,默認是false -->
<property name="pubSubDomain" value="false" />
</bean>
<!-- 配置消息隊列監聽者(Queue) -->
<!-- 打開監聽器,會當即去消費消息(即,起到實時消費通訊的做用) -->
<!-- <bean id="queueMessageListener" class="com.hp.common.listener.QueueMessageListener"></bean> -->
<!-- 顯示注入消息監聽容器(Queue),配置鏈接工廠,監聽的目標是demoQueueDestination,監聽器是上面定義的監聽器 -->
<!-- <bean id="queueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="demoQueueDestination" />
<property name="messageListener" ref="queueMessageListener" />
</bean>
-->
</beans>複製代碼
注:在配置文件中,必定不要忘記加入ActiveMQ和JMS相關的schemaweb
建立ProducerService,用於發送信息到消息中心spring
@Service
public class ProducerService {
@Resource(name = "jmsTemplate")
private JmsTemplate jmsTemplate;
private Queue queue;
/** * 根據目的地發送消息 */
public void sendMessage(Destination destination, final String msg) {
System.out.println(Thread.currentThread().getName() + " 向隊列" + destination.toString()
+ "發送消息------->" + msg);
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}
public String send(String userId, String msg) {
System.out.println(
Thread.currentThread().getName() + " 向 " + userId + " 的隊列" + userId.toString() + "發送消息------>" + msg);
queue = new ActiveMQQueue(userId);
jmsTemplate.send(queue, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message=session.createTextMessage(msg);
message.setStringProperty(userId, msg);
return message;
}
});
return "發送成功";
}
/** * 向默認目的地發送消息 */
public String sendMessage(final String msg) {
String destination = jmsTemplate.getDefaultDestinationName();
System.out
.println(Thread.currentThread().getName() + " 向隊列" + destination + "發送消息---------------------->" + msg);
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
return "發送成功";
}
}複製代碼
建立ConsumerService,用於接受消息apache
@Service
public class ConsumerService{
@Resource(name = "jmsTemplate")
private JmsTemplate jmsTemplate;
public String receive(Destination destination) {
TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
try {
System.out.println("從隊列" + destination.toString() + "收到了消息:\t" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
return textMessage.toString();
}
public String receive(String userId) {
Queue queue=new ActiveMQQueue(userId+"?consumer.prefetchSize=4");
Message message = null;
String property=null;
try {
message=jmsTemplate.receive(queue);
property=message.getStringProperty(userId);
System.out.println("從隊列" + queue.toString() + "收到了消息:\t" + property);
} catch (JMSException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
return property;
}
}複製代碼
@Controller
@RequestMapping(value="/mq")
public class MessageController {
private Logger logger = Logger.getLogger(MessageController.class);
@Resource(name = "demoQueueDestination")
private Destination destination;
@Autowired
private ProducerService producer;
@Autowired
private ConsumerService consumer;
@RequestMapping(value = "/SendMessage", method = RequestMethod.POST,produces="application/json")
@ResponseBody
public void send(@RequestParam(value = "userId",required=false)String userId,@RequestParam(value = "msg")String msg) {
logger.info(Thread.currentThread().getName() + "------------send to jms Start");
if (userId==null||"".equals(userId)) {
producer.sendMessage(destination, msg);
}else {
producer.send(userId, msg);
}
logger.info(Thread.currentThread().getName() + "------------send to jms End");
}
@RequestMapping(value = "/ReceiveMessage", method = RequestMethod.GET)
@ResponseBody
public Object receive(@RequestParam(value = "userId",required=false)String userId) {
logger.info(Thread.currentThread().getName() + "------------receive from jms Start");
String tm=null;
if (userId==null||"".equals(userId)) {
tm = consumer.receive(destination);
} else {
tm = consumer.receive(userId);
}
logger.info(Thread.currentThread().getName() + "------------receive from jms End");
return tm.toString();
}
}複製代碼
若是在配置文件中打開了監聽器的註釋,即打開監聽器,消費者會當即去消費消息,則還須要添加以下代碼:json
public class QueueMessageListener implements MessageListener{
@Override
public void onMessage(Message message) {
TextMessage tm=(TextMessage) message;
try {
System.out.println("QueueMessageListener監聽到了文本消息:\t"
+ tm.getText());
//do other work
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}複製代碼
啓動tomcat,將Javaweb項目運行在tomcat中,經過postman測試接口和方法
接受消息接口:http://localhost:8080/`{project_neme}`/mq/ReceiveMessage?userId={消息隊列名稱}
發送消息接口:http://localhost:8080/`{project_neme}`/mq/SendMessage?userId={消息隊列名稱}&msg={參數}tomcat
場景1: 對於mq隊列中的消息,系統須要作一些監控或者問題的跟蹤,則須要去查看MQ中的數據,可是有須要保證在查看以後不會被刪除,由於在P2P模式中,consumer.receive()後消息以後,消息就被消費,MQ不會發送其餘consumer,對於這種場景該如何考慮採用ActiveMQ的何種技術去作?
場景2:將使用JDBC持久化的ActiveMQ轉換爲其餘存儲方式(文件存儲、Kaha、memory),須要作數據遷移,那如何實現?
解決:對於這兩種場景,均可以用消息隊列中消息查看的方式去實現;
第一個場景,使用ActiveMQ的Browser能夠查看未被消費的信息,這樣既保證數據不會被消費,也能夠實現本身的其餘業務;
第二個場景,可使用Browser將未被消費的信息拿出來,而後再經過produce.send()的方式,將消息發送到其餘存儲方式的ActiveMQ上;session
如下代碼實現了使用Browser讀出某個隊列中未消費的全部消息,並將它們放到list中app
public class BrowersService {
private static final Logger logger=LogManager.getLogger(BrowersService.class);
//配置文件配置的jmsTemplate
@Resource(name = "jmsTemplate")
private JmsTemplate jmsTemplate;
public void getMessageFromQuese(String queueName){
List<String> message=jmsTemplate.browse(queueName, new BrowserCallback<List<String>>() {
@Override
public List<String> doInJms(Session session, QueueBrowser browser) throws JMSException {
Enumeration<TextMessage> enumeration=browser.getEnumeration();
List<String> messages=new ArrayList<>();
while (enumeration.hasMoreElements()) {
TextMessage textMessage = (TextMessage) enumeration.nextElement();
logger.info("Message text: "+ textMessage.getText()
+" ID: "+textMessage.getJMSMessageID());
messages.add(textMessage.getText());
}
return messages;
}
});
logger.info("message from browser "+message);
}
}複製代碼