官網地址
下載下來直接解壓,進入bin目錄,執行命令./activemq start便可啓動,如下是相關目錄結構說明html
默認的服務端口爲61616,不過該端口能夠在conf目錄下的activemq.xml中進行修改,找到transportConnectors標籤,修改openwire中的端口便可。
監控平臺默認的端口爲8161,若是要修改該端口,能夠修改conf/jetty.xml文件中的jetty啓動端口。默認的用戶名和密碼爲admin/admin,user/user,若是要修改用戶名和密碼,則在conf/jetty-realm.properties中進行修改便可,格式爲[用戶名:密碼,角色名],關於web管理界面部分列說明以下:java
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
複製代碼
# MQ地址
spring.activemq.broker-url=tcp://127.0.0.1:61616
#集羣配置
#spring.activemq.broker-url=failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617)
# 在考慮結束以前等待的時間
#spring.activemq.close-timeout=15s
# 是否啓用內存模式
spring.activemq.in-memory=true
# 是否在回滾回滾消息以前中止消息傳遞。這意味着當啓用此命令時,消息順序不會被保留。
spring.activemq.non-blocking-redelivery=false
# 等待消息發送響應的時間。設置爲0等待永遠。
spring.activemq.send-timeout=0
#默認狀況下activemq提供的是queue模式,若要使用topic模式須要配置下面配置,消費端使用
spring.jms.pub-sub-domain=true
#帳號
spring.activemq.user=admin
# 密碼
spring.activemq.password=admin
複製代碼
@Service("producer")
public class Producer {
@Autowired
private JmsMessagingTemplate jmsTemplate;
public void sendMessage(Destination destination, final Object msg){
jmsTemplate.convertAndSend(destination, msg);
}
}
@Configuration
public class DestinationConfig {
@Bean(name = "testTopic")
public Topic getTestTopic(){
return new ActiveMQTopic("topic.test");
}
@Bean(name = "testQueue")
public Queue getTestQueue(){
return new ActiveMQQueue("queue.test");
}
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Autowired
private Producer producer;
@Resource(name = "testTopic")
private Topic testTopic;
@Resource(name = "testQueue")
private Queue testQueue;
@Test
public void sendTopicMsg(){
producer.sendMessage(testTopic, "This is test Topic");
}
@Test
public void sendQueueMsg(){
producer.sendMessage(testQueue, "This is test Queue");
}
}
複製代碼
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/** * containerFactory配置請參考第5點說明 * 該配置用來指明該destination是隊列仍是主題消息 */
@Component
public class Consumer {
@JmsListener(destination = "topic.test", containerFactory = "jmsListenerContainerTopic")
public void recevieTopicMsg(String msg){
System.out.println("接收的主題消息爲:"+msg);
}
@JmsListener(destination = "queue.test", containerFactory = "jmsListenerContainerQueue")
//@SendTo("otherTopic") // 表示將方法的返回值發送到另外的隊列,含有這個註解的話,則方法須要返回值
public void recevieQueueMsg(String msg){
System.out.println("接收的隊列消息爲:"+msg);
}
}
複製代碼
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
@Configuration
public class MqConfig {
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory beanFactory = new DefaultJmsListenerContainerFactory();
beanFactory.setPubSubDomain(true);
beanFactory.setConnectionFactory(connectionFactory);
return beanFactory;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory beanFactory = new DefaultJmsListenerContainerFactory();
beanFactory.setConnectionFactory(connectionFactory);
return beanFactory;
}
}
複製代碼
import java.io.Serializable;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import javax.annotation.Resource;
import javax.jms.*;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.ProducerCallback;
import org.springframework.stereotype.Service;
@Service
public class Producer {
private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class.getName());
private static ExecutorService pool = Executors.newCachedThreadPool();
@Autowired
private JmsTemplate jmsTemplate;
@Resource(name = "testTopic")
private Topic testTopic;
public void sendTechStatRefresh(String json) {
try {
sendMsgResultNotification(sendTopicMsg(testTopic, json), "主題消息測試", json);
} catch (Throwable throwable) {
LOGGER.error(throwable.getMessage(), throwable);
}
}
@Resource(name = "testQueue")
private Queue testQueue;
public void sendLotteryMatchScore(String msg) {
try {
sendMsgResultNotification(sendQueueMsg(testQueue, msg), "隊列消息測試", msg);
} catch (Throwable throwable) {
LOGGER.error(throwable.getMessage(), throwable);
}
}
private boolean sendTopicMsg(final Topic topic, final String msg) {
if (topic == null || StringUtils.isBlank(msg)) {
throw new NullPointerException("topic is null or msg is null");
}
try {
return pool.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
jmsTemplate.convertAndSend(topic, msg);
return Boolean.TRUE;
}
}).get();
} catch (Throwable throwable) {
try {
LOGGER.error("send msg to topic error:topic = " + topic.getTopicName() + " msg = " + msg.toString(), throwable);
} catch (Exception e) {
LOGGER.error("發送主題消息異常", e);
}
}
return Boolean.FALSE;
}
private boolean sendQueueMsg(final Queue queue,final String msg) {
if (queue == null || StringUtils.isBlank(msg)) {
throw new NullPointerException("topic is null or msg is null");
}
try {
return pool.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
jmsTemplate.convertAndSend(queue, msg);
return Boolean.TRUE;
}
}).get();
} catch (Throwable throwable) {
try {
LOGGER.error("send msg to queue error:queue = " + queue.getQueueName() + " msg = " + msg.toString(),
throwable);
} catch (Exception e) {
LOGGER.error("發送隊列消息異常", e);
}
}
return Boolean.FALSE;
}
private void sendMsgResultNotification(final boolean rs, final String msgType, final String data) {
if (rs) {
LOGGER.debug("推送 " + msgType + "成功 : 消息[" + data + "]");
}
}
}
複製代碼
STOMP是一個簡單的可互操做的協議, 被用於經過中間服務器在客戶端之間進行異步消息傳遞。它定義了一種在客戶端與服務端進行消息傳遞的文本格式。 STOMP是基於幀的協議,它假定底層爲一個2-way的可靠流的網絡協議(如TCP)。客戶端和服務器通訊使用STOMP幀流通信。web
參考文檔:blog.csdn.net/u012758088/…spring