ActiveMQ

簡介及安裝

官網地址
下載下來直接解壓,進入bin目錄,執行命令./activemq start便可啓動,如下是相關目錄結構說明html

  • bin存放的是腳本文件
  • conf存放的是基本配置文件
  • data存放的是日誌文件
  • docs存放的是說明文檔
  • examples存放的是簡單的實例
  • lib存放的是activemq所需jar包
  • webapps用於存放項目的目錄\

默認的服務端口爲61616,不過該端口能夠在conf目錄下的activemq.xml中進行修改,找到transportConnectors標籤,修改openwire中的端口便可。
監控平臺默認的端口爲8161,若是要修改該端口,能夠修改conf/jetty.xml文件中的jetty啓動端口。默認的用戶名和密碼爲admin/admin,user/user,若是要修改用戶名和密碼,則在conf/jetty-realm.properties中進行修改便可,格式爲[用戶名:密碼,角色名],關於web管理界面部分列說明以下:java

  • Number Of Consumers:這個是消費者端的消費者數量
  • Number Of Pending Messages:等待消費的消息 「這個是當前未出隊列的數量。能夠理解爲總接收數-總出隊列數」
  • Messages Enqueued:進入隊列的消息 「進入隊列的總數量,包括出隊列的。 這個數量只增不減」
  • Messages Dequeued:出了隊列的消息 「能夠理解爲是消費這消費掉的數量」

springBoot集成

  1. 引入jar包
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
複製代碼
  1. 配置文件
# MQ地址
spring.activemq.broker-url=tcp://127.0.0.1:61616
#集羣配置
#spring.activemq.broker-url=failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617)
# 在考慮結束以前等待的時間
#spring.activemq.close-timeout=15s
# 是否啓用內存模式
spring.activemq.in-memory=true 
# 是否在回滾回滾消息以前中止消息傳遞。這意味着當啓用此命令時,消息順序不會被保留。
spring.activemq.non-blocking-redelivery=false
# 等待消息發送響應的時間。設置爲0等待永遠。
spring.activemq.send-timeout=0
#默認狀況下activemq提供的是queue模式,若要使用topic模式須要配置下面配置,消費端使用
spring.jms.pub-sub-domain=true
#帳號
spring.activemq.user=admin
# 密碼
spring.activemq.password=admin
複製代碼
  1. 消息發送端代碼,我將消息生成和目的地解耦了,這樣更有利於擴展
@Service("producer")
public class Producer {
	@Autowired
	private JmsMessagingTemplate jmsTemplate;
	
	public void sendMessage(Destination destination, final Object msg){
		jmsTemplate.convertAndSend(destination, msg);
	}
}
@Configuration
public class DestinationConfig {

	@Bean(name = "testTopic")
	public Topic getTestTopic(){
		return new ActiveMQTopic("topic.test");
	}
	@Bean(name = "testQueue")
	public Queue getTestQueue(){
		return new ActiveMQQueue("queue.test");
	}
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
	@Autowired
	private Producer producer;

	@Resource(name = "testTopic")
	private Topic testTopic;
	@Resource(name = "testQueue")
	private Queue testQueue;

	@Test
	public void sendTopicMsg(){
		producer.sendMessage(testTopic, "This is test Topic");
	}

	@Test
	public void sendQueueMsg(){
		producer.sendMessage(testQueue, "This is test Queue");
	}

}
複製代碼
  1. 消費端代碼
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/** * containerFactory配置請參考第5點說明 * 該配置用來指明該destination是隊列仍是主題消息 */
@Component
public class Consumer {
	@JmsListener(destination = "topic.test", containerFactory = "jmsListenerContainerTopic")
	public void recevieTopicMsg(String msg){
		System.out.println("接收的主題消息爲:"+msg);
	}

	@JmsListener(destination = "queue.test", containerFactory = "jmsListenerContainerQueue")
	//@SendTo("otherTopic") // 表示將方法的返回值發送到另外的隊列,含有這個註解的話,則方法須要返回值
	public void recevieQueueMsg(String msg){
		System.out.println("接收的隊列消息爲:"+msg);
	}
}
複製代碼
  1. 若是項目既須要監聽topic,也須要監聽queue,則須要單獨配置,代碼以下
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

@Configuration
public class MqConfig {
	@Bean
	public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
		DefaultJmsListenerContainerFactory beanFactory = new DefaultJmsListenerContainerFactory();
		beanFactory.setPubSubDomain(true);
		beanFactory.setConnectionFactory(connectionFactory);
		return beanFactory;
	}

	@Bean
	public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
		DefaultJmsListenerContainerFactory beanFactory = new DefaultJmsListenerContainerFactory();
		beanFactory.setConnectionFactory(connectionFactory);
		return beanFactory;
	}
}
複製代碼
  1. 在實際應用中,咱們發送消息一般但願知道消息是否發送成功,下面代碼我經過線程池發送消息,而且返回發送是否成功信息,僅供參考,若是有更好方式,歡迎留言討論交流。
import java.io.Serializable;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;

import javax.annotation.Resource;
import javax.jms.*;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.ProducerCallback;
import org.springframework.stereotype.Service;

@Service
public class Producer {

    private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class.getName());

    private static ExecutorService pool = Executors.newCachedThreadPool();

    @Autowired
    private JmsTemplate jmsTemplate;

    @Resource(name = "testTopic")
    private Topic testTopic;
    public void sendTechStatRefresh(String json) {
        try {
            sendMsgResultNotification(sendTopicMsg(testTopic, json), "主題消息測試", json);
        } catch (Throwable throwable) {
            LOGGER.error(throwable.getMessage(), throwable);
        }
    }

    @Resource(name = "testQueue")
    private Queue testQueue;
    public void sendLotteryMatchScore(String msg) {
        try {
            sendMsgResultNotification(sendQueueMsg(testQueue, msg), "隊列消息測試", msg);
        } catch (Throwable throwable) {
            LOGGER.error(throwable.getMessage(), throwable);
        }
    }

    private boolean sendTopicMsg(final Topic topic, final String msg) {
        if (topic == null || StringUtils.isBlank(msg)) {
            throw new NullPointerException("topic is null or msg is null");
        }
        try {
            return pool.submit(new Callable<Boolean>() {
                @Override
                public Boolean call() throws Exception {
                    jmsTemplate.convertAndSend(topic, msg);
                    return Boolean.TRUE;
                }
            }).get();
        } catch (Throwable throwable) {
            try {
                LOGGER.error("send msg to topic error:topic = " + topic.getTopicName() + " msg = " + msg.toString(), throwable);
            } catch (Exception e) {
                LOGGER.error("發送主題消息異常", e);
            }
        }
        return Boolean.FALSE;
    }

    private boolean sendQueueMsg(final Queue queue,final String msg) {
        if (queue == null || StringUtils.isBlank(msg)) {
            throw new NullPointerException("topic is null or msg is null");
        }

        try {
            return pool.submit(new Callable<Boolean>() {
                @Override
                public Boolean call() throws Exception {
                    jmsTemplate.convertAndSend(queue, msg);
                    return Boolean.TRUE;
                }
            }).get();
        } catch (Throwable throwable) {
            try {
                LOGGER.error("send msg to queue error:queue = " + queue.getQueueName() + " msg = " + msg.toString(),
                        throwable);
            } catch (Exception e) {
                LOGGER.error("發送隊列消息異常", e);
            }
        }
        return Boolean.FALSE;
    }

    private void sendMsgResultNotification(final boolean rs, final String msgType, final String data) {
        if (rs) {
            LOGGER.debug("推送 " + msgType + "成功 : 消息[" + data + "]");
        }
    }
}
複製代碼

AQMP協議

  • AMQP協議中的元素包括:Message(消息體)、Producer(消息生產者)、Consumer(消息消費者)、Virtual Host(虛擬節點)、Exchange(交換機)、Queue(隊列)等;
  • 由Producer(消息生產者)和Consumer(消息消費者)構成了AMQP的客戶端,他們是發送消息和接收消息的主體。AMQP服務端稱爲Broker,一個Broker中必定包含完整的Virtual Host(虛擬主機)、 Exchange(交換機)、Queue(隊列)定義;
  • 一個Broker能夠建立多個Virtual Host(虛擬主機),咱們將討論的Exchange和Queue都是虛擬機中的工做元素(還有User元素)。注意,若是AMQP是由多個Broker構成的集羣提供服務,那麼一個Virtual Host也能夠由多個Broker共同構成;
  • Connection是由Producer(消息生產者)和Consumer(消息消費者)建立的鏈接,鏈接到Broker物理節點上。可是有了Connection後客戶端還不能和服務器通訊,在Connection之上客戶端會建立Channel,鏈接到Virtual Host或者Queue上,這樣客戶端才能向Exchange發送消息或者從Queue接受消息。一個Connection上容許存在多個Channel,只有Channel中可以發送/接受消息。
  • Exchange元素是AMQP協議中的交換機,Exchange能夠綁定多個Queue也能夠同時綁定其餘Exchange。消息經過Exchange時,會按照Exchange中設置的Routing(路由)規則,將消息發送到符合的Queue或者Exchange中。

STOMP協議

STOMP是一個簡單的可互操做的協議, 被用於經過中間服務器在客戶端之間進行異步消息傳遞。它定義了一種在客戶端與服務端進行消息傳遞的文本格式。 STOMP是基於幀的協議,它假定底層爲一個2-way的可靠流的網絡協議(如TCP)。客戶端和服務器通訊使用STOMP幀流通信。web

參考文檔:blog.csdn.net/u012758088/…spring

相關文章
相關標籤/搜索