本文記錄學習在Spring Boot中使用MQ。html
MQ全稱(Message Queue)又名消息隊列,是一種異步通信的中間件。它的做用相似於郵局,發信人(生產者)只須要將信(消息)交給郵局,而後由郵局再將信(消息)發送給具體的接收者(消費者),具體發送過程與時間發信人能夠不關注,也不會影響發信人作其它事情。目前常見的MQ有activemq、kafka、rabbitmq、zeromq、rocketmq等。java
使用MQ的優勢主要有:spring
1 方法的異步執行 使用MQ能夠將耗時的同步操做經過以發送消息的方式進行了異步化處理,減小了因爲同步而等待的時間;apache
2 程序之間鬆耦合 使用MQ能夠減小了服務之間的耦合性,不一樣的服務能夠經過消息隊列進行通訊,只要約定好消息的內容格式就行;服務器
JMS(Java Message Service)即java消息服務,是一個Java平臺中關於面向消息中間件(MOM)的 API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。JMS的消息機制有2種模型,一種是1對1(Point to Point)的隊列的消息,這種消息,只能被一個消費者消費;另外一種是一對多的發佈/訂閱(Topic)消息,一條消息能夠被多個消費者消費。ActiveMq是對JMS的一個實現。app
官網下載一個服務程序,解壓後直接啓動服務就能夠了,下載地址:http://activemq.apache.org/activemq-5158-release.htmldom
SpringBoot也對Active MQ提供了支持,咱們使用時引入具體的依賴便可,修改pom.xml文件,添加依賴異步
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
在application.properties文件中配置Active MQ服務器的鏈接信息tcp
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
#消息模式 true:廣播(Topic),false:隊列(Queue),默認時false
#spring.jms.pub-sub-domain=true
完成以上配置信息後,當咱們在啓動SpringBoot項目時,會自動幫咱們完成初始化操做,並提供一個JmsMessagingTemplate,提提供了咱們經常使用發送消息的各類方法供咱們使用。咱們只須要在使用的地方注入JmsMessagingTemplate便可使用。分佈式
發送隊列消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqApplicationTests {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Test
public void testQueueMsg(){
//建立名稱爲zyQueue的隊列
Queue queue = new ActiveMQQueue("zyQueue");
//向隊列發送消息
jmsMessagingTemplate.convertAndSend(queue,"這是一個隊列消息!");
}
}
消息的接收方,監聽消息隊列,當隊列中有消息時就能夠獲取到消息
@Component
public class Consumer {
private static DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,sss");
/**
* destination 目標地址即隊列
*/
@JmsListener(destination = "zyQueue")
public void receiveMessage(String text){
System.out.println("接收隊列消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
}
}
執行測試方法發送消息能夠看到,控制檯輸出的消費者接受到消息
隊列消息只能有一個消費者,若是有多個消費者同時監聽一個隊列時,只能有一個拿到消息,咱們測試,修改發送方法,循環發送10調消息
@Test
public void testQueueMsg(){
//建立名稱爲zyQueue的隊列
Queue queue = new ActiveMQQueue("zyQueue");
//向隊列發送消息
for (int i=0;i<10;i++) {
jmsMessagingTemplate.convertAndSend(queue,"這是第"+i+"個隊列消息!");
}
}
在Consumer 類中再添加一個消費者,監聽隊列zyQueue
@JmsListener(destination = "zyQueue")
public void receiveMessage(String text){
System.out.println("接收隊列消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
}
@JmsListener(destination = "zyQueue")
public void receiveMessage1(String text){
System.out.println("1接收隊列消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
}
執行發送消息,看到控制檯輸出的結果,2個消費者平分了這10條消息
若是但願監聽同一個隊列的多個消費者都能接收到全部消息,咱們就只能發送Topic消息了,咱們修改application.properties中的
#消息模式 true:廣播(Topic),false:隊列(Queue),默認時false
spring.jms.pub-sub-domain=true
表示要發送發佈/訂閱消息,發送消息的隊列改用Topic發送消息,以下
@Test
public void testTopicMsg(){
Topic topic = new ActiveMQTopic("zyTopic");
for (int i=0;i<5;i++){
jmsMessagingTemplate.convertAndSend(topic,"這是第"+i+"個Topic消息!");
}
}
咱們在Consumer 類中添加兩個消費者來監聽zyTopic隊列,接受消息
@JmsListener(destination = "zyTopic")
public void receiveTopicMessage1(String text){
System.out.println("消費者1接收消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
}
@JmsListener(destination = "zyTopic")
public void receiveTopicMessage2(String text){
System.out.println("消費者2接收消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
}
執行發消息方法,能夠看到控制檯輸出的內容,2個消費者都完整的接收到了5條消息
咱們在測試發送消息時修改了屬性文件中的配置信息,才能夠發送對應的類型的消息,這是因爲SpringBoot中默認的是隊列消息(查看源碼能夠知道,監聽器默認使用的DefaultJmsListenerContainerFactory),若是咱們想在不修改配置信息的狀況下能夠同時發送Queue和Topic消息怎麼辦呢,咱們須要手動的更改初始的配置類,分別針對Queue和Topic消息提供JmsListenerContainerFactory
新建一個配置類,以下
@SpringBootConfiguration
public class ActiveMqConfig {
@Bean("queueListenerFactory")
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//設置消息模型爲隊列
factory.setPubSubDomain(false);
return factory;
}
@Bean("topicListenerFactory")
public JmsListenerContainerFactory topicListenerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//設置消息模型爲隊列
factory.setPubSubDomain(true);
return factory;
}
}
在容器啓動時會針對兩種消息類型,初始化獲得兩個不一樣的JmsListenerContainerFactory。下來再修改消費者類,在 @JmsListener 註解中指定 containerFactory,如
@JmsListener(destination = "zyQueue", containerFactory = "queueListenerFactory")
public void receiveMessage(String text){
System.out.println("接收隊列消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
}
@JmsListener(destination = "zyTopic", containerFactory = "topicListenerFactory")
public void receiveTopicMessage1(String text){
System.out.println("消費者1接收消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
}
Queue消息使用 queueListenerFactory,Topic消息使用 topicListenerFactory,而後註釋掉屬性文件中的消息模式配置就能夠了。