爲了實現分佈式系統可擴展、可伸縮性的關鍵組件,須要具備高吞吐量、高可用等特色。咱們不少時候都會考慮將消息系統歸入咱們的選擇中;好比我一個登陸事件,有可能我登陸以後須要作不少東西,好比日誌,好比發佈消息,好比推送,再好比發送代金券等等;這些事件與登陸息息相關,可是本質上它們與登陸這個事件沒有直接的關係,只是在登陸事件後,系統按照需求須要去初始化一些東西,或者去記錄一些東西等等;若是把全部的東西都歸入到登陸這個事件中(同一個事物中),那登陸的事件內處理的邏輯更多,會形成什麼後果?登陸時間很長,讓用戶沒法忍受,另外,假如登陸過程當中出現了未發現異常,那是否是致使用戶直接沒法登陸?爲了解決這樣的問題,咱們引入了消息系統,好比我這臺機登陸事後,我將登陸的一些信息,經過遠程方式發送到另一臺機器上(或者同一臺機),讓它們去處理相應的後續邏輯實現;html
目的是:一、用戶登陸更快,體驗上更好,java
二、只要保證登陸部分完整,即使後續出錯,並不影響用戶正常使用,即容錯性更強!算法
談到消息系統,首先想到的第一個問題確定會是:負載均衡
消息的順序性dom
原本很想說一下關於消息順序性的一些問題,不過因爲我也是借鑑了一些其餘的帖子,以及官方的文檔,因此這裏就不會去贅述這些了,稍後我會分享一些很不錯的連接,留給本身之後看,也但願能夠給一些恰好要入門rocketmq的網友提供一些資料;分佈式
rocketmq是阿里雲的一套開源產品,功能什麼的就不贅述了,請自行去網站了解:https://help.aliyun.com/document_detail/29532.html?spm=5176.doc34411.6.104.EvZr21ide
rocketmq是一類二級消息分類的產品,一級爲topic,二級爲tag;網站
broker按照收到生產者發送的消息體,分析其中的topic,而後去找到相應的topic轉發出去,在消費端,消費者根據收到的消息分析出tag的不一樣去作不一樣的邏輯處理;this
那麼在這個時候,咱們就會好奇,爲了保證消息的順序執行的狀況,RokectMQ是如何選擇topic?爲此,咱們先看看rokcetmq的源代碼:阿里雲
// 官方例子以下: public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.start(); for (int i = 0; i < 10000000; i++) try { { Message msg = new Message("TopicTest",// topic "TagA",// tag "OrderID188",// key ("Hello MetaQ").getBytes(RemotingHelper.DEFAULT_CHARSET));// body SendResult sendResult = producer.send(msg); //發送消息 System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } } // defalutMQProducer 類下封裝的方法 @Override public SendResult send(Message msg, MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, mq); } // send的實現方法 @Override public SendResult send(Message message) { this.checkONSProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl()); com.alibaba.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message); try { com.alibaba.rocketmq.client.producer.SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ); message.setMsgID(sendResultRMQ.getMsgId()); SendResult sendResult = new SendResult(); sendResult.setTopic(sendResultRMQ.getMessageQueue().getTopic());//如何選擇topic sendResult.setMessageId(sendResultRMQ.getMsgId()); return sendResult; } catch (Exception e) { log.error(String.format("Send message Exception, %s", message), e); throw checkProducerException(message.getTopic(), message.getMsgID(), e); } }
在官方例子中,咱們能夠看到,在發送消息的時候,咱們並無去了解細緻的發送消息時,那麼MQ究竟是如何選擇topic的?
可是能夠從代碼中看到,它確實有個MessageQueueSelector 接口,這個接口負責是選擇topic,那麼咱們就來看看它到底爲咱們提供了那些實現方法吧(通常的消息都是輪詢去尋找topic來實現負載均衡):
/** * 若是lastBrokerName不爲null,則尋找與其不一樣的MessageQueue(輪詢負載均衡) */ public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName != null) { int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size();//輪詢 MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return null; } else { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); return this.messageQueueList.get(pos); } }
若是對於這種輪詢方式的負載均衡不滿意,並不能打達到咱們的需求,那麼咱們又改如何去選擇?
阿里雲提供了三種方式來解決咱們的需求,若是再不能知足,那麼就知道修改源碼算法部分來達到本身的要求了。
/** * 使用哈希算法來選擇隊列,順序消息一般都這樣作<br> * * @author shijia.wxr<vintage.wang@gmail.com> * @since 2013-6-27 */ public class SelectMessageQueueByHash implements MessageQueueSelector { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = arg.hashCode(); if (value < 0) { value = Math.abs(value); } value = value % mqs.size(); return mqs.get(value); } } /** * 根據機房來選擇發往哪一個隊列,支付寶邏輯機房使用 * * @author shijia.wxr<vintage.wang@gmail.com> * @since 2013-7-25 */ public class SelectMessageQueueByMachineRoom implements MessageQueueSelector { private Set<String> consumeridcs; @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // TODO Auto-generated method stub return null; } public Set<String> getConsumeridcs() { return consumeridcs; } public void setConsumeridcs(Set<String> consumeridcs) { this.consumeridcs = consumeridcs; } } /** * 發送消息,隨機選擇隊列 * * @author shijia.wxr<vintage.wang@gmail.com> * @since 2013-7-25 */ public class SelectMessageQueueByRandoom implements MessageQueueSelector { private Random random = new Random(System.currentTimeMillis()); @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = random.nextInt(); if (value < 0) { value = Math.abs(value); } value = value % mqs.size(); return mqs.get(value); } }
能夠看到,rocketmq爲咱們提供了三個選擇(除去輪詢方法),那麼若是咱們在很是關注消息順序的時候,咱們能夠選擇經過哈希算法求值的方式來實現
SelectMessageQueueByHash
咱們每一個傳遞進入的對象都會被哈希算法計算出 一個哈希值,好比咱們傳遞的是訂單號,那麼無疑咱們能夠保證相同的訂單號能夠傳遞給相同的topic去處理,那麼只要再保證是一致的tag就能夠保證順序的一致性啦;
目的是:生產者 -- MQ服務端 -- 消費者 能夠達到一一對應的關係
第二種是機房選擇,算法是木有啦,應該是根據ip地址去區分,反正概念我不是很清晰,也沒有去注意和了解;有了解的親留個資料給我吧,連接就好,謝謝撒……
第三種是隨機選擇,也就是誰也不知道它到底會選擇誰,這種效率其實不好,沒有負載均衡,誰也不知道會不會堵塞起來,誰也不知道某個隊列是否已經塞滿。
有些問題,看起來很重要,但實際上咱們能夠經過合理的設計或者將問題分解來規避。若是硬要把時間花在解決它們身上,其實是浪費的,效率低下的。從這個角度來看消息的順序問題,咱們能夠得出兩個結論:
一、不關注亂序的應用實際大量存在
二、隊列無序並不意味着消息無序
參考連接以下:
http://www.jianshu.com/p/453c6e7ff81c
http://blog.csdn.net/column/details/learningrocketmq.html?&page=1
路走多了,早晚有人會翻車……