消息隊列中間件是分佈式系統中重要的組件,主要解決應用耦合,異步消息,流量削鋒等問題。實現高性能,高可用,可伸縮和最終一致性架構。是大型分佈式系統不可缺乏的中間件。java
目前在生產環境,使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。spring
如下介紹消息隊列在實際應用中經常使用的使用場景。異步處理,應用解耦,流量削鋒和消息通信四個場景。本篇使用ActiveMQ+SpringBoot來模擬這四個場景。數據庫
場景說明:汽車觸發圍欄報警後,須要發送報警郵件和報警短信。傳統的作法有兩種1.串行的方式;2.並行方式。apache
(1)串行方式:將報警信息寫入數據庫成功後,發送報警郵件,再發送報警短信。以上三個任務所有完成後,該報警信息加入統計列表。網絡
(2)並行方式:報警信息寫入數據庫成功後,同時發送報警郵件和短信。session
假設三個業務節點每一個使用50毫秒鐘,不考慮網絡等其餘開銷,則串行方式的時間是150毫秒,並行的時間多是100毫秒。架構
由於CPU在單位時間內處理的請求數是必定的,假設CPU1秒內吞吐量是100次。則串行方式1秒內CPU可處理的請求量是7次(1000/150)。並行方式處理的請求量是10次(1000/100)。併發
小結:如以上案例描述,傳統的方式系統的性能(併發量,吞吐量,響應時間)會有瓶頸。如何解決這個問題呢?app
引入消息隊列,將不是必須的業務邏輯,異步處理。改造後的架構以下:異步
代碼示例
①在pom文件中引入activemq依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> <version>1.5.6.RELEASE</version> </dependency>
②在配置文件中加上activemq的配置
spring.activemq.broker-url=tcp://127.0.0.1:61616
# 在考慮結束以前等待的時間
#spring.activemq.close-timeout=15s
# 默認代理URL是否應該在內存中。若是指定了顯式代理,則忽略此值。
spring.activemq.in-memory=true
# 是否在回滾回滾消息以前中止消息傳遞。這意味着當啓用此命令時,消息順序不會被保留。
spring.activemq.non-blocking-redelivery=false
# 密碼
spring.activemq.password=123456
# 等待消息發送響應的時間。設置爲0等待永遠。
spring.activemq.send-timeout=0
spring.activemq.user=haha
# 是否信任全部包
#spring.activemq.packages.trust-all=
# 要信任的特定包的逗號分隔列表(當不信任全部包時)
#spring.activemq.packages.trusted=
# 當鏈接請求和池滿時是否阻塞。設置false會拋「JMSException異常」。
#spring.activemq.pool.block-if-full=true
# 若是池仍然滿,則在拋出異常前阻塞時間。
#spring.activemq.pool.block-if-full-timeout=-1ms
# 是否在啓動時建立鏈接。能夠在啓動時用於加熱池。
#spring.activemq.pool.create-connection-on-startup=true
# 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
#spring.activemq.pool.enabled=false
# 鏈接過時超時。
#spring.activemq.pool.expiry-timeout=0ms
# 鏈接空閒超時
#spring.activemq.pool.idle-timeout=30s
# 鏈接池最大鏈接數
#spring.activemq.pool.max-connections=1
# 每一個鏈接的有效會話的最大數目。
#spring.activemq.pool.maximum-active-session-per-connection=500
# 當有"JMSException"時嘗試從新鏈接
#spring.activemq.pool.reconnect-on-exception=true
# 在空閒鏈接清除線程之間運行的時間。當爲負數時,沒有空閒鏈接驅逐線程運行。
#spring.activemq.pool.time-between-expiration-check=-1ms
# 是否只使用一個MessageProducer
#spring.activemq.pool.use-anonymous-producers=true
③消息生產者
import java.util.Map; import javax.jms.Destination; import javax.jms.Queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessagePostProcessor; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * 報警消息Producer * @author ko * */ @Component //@EnableScheduling public class AlarmProducer { // 也能夠注入JmsTemplate,JmsMessagingTemplate對JmsTemplate進行了封裝 @Autowired private JmsTemplate jmsTemplate; // private JmsMessagingTemplate jmsTemplate; // @Autowired // private Queue queue; // @Scheduled(fixedDelay=5000) // 5s執行一次 只有無參的方法才能用該註解 public void sendMessage(Destination destination, String message){ // jmsTemplate.convertAndSend(destinationName, payload, messagePostProcessor); this.jmsTemplate.convertAndSend(destination, message); }
// 雙向隊列
// @JmsListener(destination="out.queue")
// public void consumerMessage(String text){
// System.out.println("從out.queue隊列收到的回覆報文爲:"+text);
// }
}
④消息消費者
import org.apache.commons.lang.StringUtils; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; /** * 圍欄報警Consumer * @author ko * */ @Component public class AlarmConsumer { // 使用JmsListener配置消費者監聽的隊列,其中text是接收到的消息 @JmsListener(destination = "mytest.queue")
// @SendTo("out.queue") 爲了實現雙向隊列 public void receiveQueue(String text) { if(StringUtils.isNotBlank(text)){ System.out.println("AlarmConsumer收到的報文爲:"+text); System.out.println("把報警信息["+text+"]發送郵件給xxx"); System.out.println("把報警信息["+text+"]發送短信給xxx"); System.out.println(""); } } }
⑤controller裏寫上測試接口
@Autowired private AlarmProducer alarmProducer; @RequestMapping(value="/chufabaojing",method=RequestMethod.GET) @ApiOperation(value="觸發報警", notes="觸發報警") @ApiImplicitParams({ @ApiImplicitParam(name = "devicename", value = "name",example = "xxxx", required = true, dataType = "string",paramType="query"), }) public String chufabaojing(String devicename){ List<String> alarmStrList = new ArrayList<>(); alarmStrList.add(devicename+"out fence01"); alarmStrList.add(devicename+"out fence02"); alarmStrList.add(devicename+"in fence01"); alarmStrList.add(devicename+"in fence02"); System.out.println("設備"+devicename+"出圍欄報警"); // 報警信息寫入數據庫 System.out.println("報警數據寫入數據庫。。。"); // 寫入消息隊列 Destination destination = new ActiveMQQueue("mytest.queue"); for (String alarmStr : alarmStrList) { alarmProducer.sendMessage(destination, alarmStr); } // 消息寫進消息隊列裏就無論了 // 下面兩步驟移到activemq消費者裏 // 發送郵件 // 發送短信 return "success"; }
場景介紹,在spring cloud分佈式微服務項目中,工單管理和設備管理分別是兩個微服務,若是A工單被張三接單了,那麼工單狀態要設爲已派單,檢驗員設爲張三,設備狀態要置爲在檢。
傳統的作法是,先調用工單管理的工單更新接口,成功以後再調用設備管理的設備更新接口,成功以後再返回操做提示給用戶。這樣作的缺點是應用耦合,若是在派單操做的時候正好設備管理微服務掛了或者阻塞了,那麼派單操做就會失敗或者要等待很長時間無反饋。另外若是設備管理的接口有變更,那麼工單管理裏面的代碼也要改動。
引入消息中間件,派單的時候,工單管理的工單更新接口處理好後把信息寫入消息隊列,而後直接返回操做反饋給用戶。無論工單管理服務正不正常,正常就從消息隊列裏訂閱消息處理,不正常就等待回覆正常後再訂閱消息處理。
場景介紹,XX公司的系統原來是針對A地區的客戶開發的,如今爲了搶佔市場,拿下了B和C兩個地區的客戶,那麼新系統上線,就存在如何把B和C的基礎數據導入XX公司的系統中來的問題,短期內要把龐大的舊數據改造適合新系統再導入進來,這很容易使系統掛掉,另外天天還有增量數據產生。
這時能夠引入消息中間件,B和C的客戶只要負責把數據規則放到消息隊列裏就行了,XX公司能夠有條不紊的從消息隊列裏訂閱數據,能夠有效緩解短期內的高流量壓力,可是這也對消息中間件的可靠性提出了要求。
若是遇到activemq的瓶頸,能夠看看activemq集羣方案,這篇文章 http://blog.csdn.net/shuangzh115/article/details/50989182
相似聊天室的功能。