rocketmq 消息隊列的順序性問題

爲了實現分佈式系統可擴展、可伸縮性的關鍵組件,須要具備高吞吐量、高可用等特色。咱們不少時候都會考慮將消息系統歸入咱們的選擇中;好比我一個登陸事件,有可能我登陸以後須要作不少東西,好比日誌,好比發佈消息,好比推送,再好比發送代金券等等;這些事件與登陸息息相關,可是本質上它們與登陸這個事件沒有直接的關係,只是在登陸事件後,系統按照需求須要去初始化一些東西,或者去記錄一些東西等等;若是把全部的東西都歸入到登陸這個事件中(同一個事物中),那登陸的事件內處理的邏輯更多,會形成什麼後果?登陸時間很長,讓用戶沒法忍受,另外,假如登陸過程當中出現了未發現異常,那是否是致使用戶直接沒法登陸?爲了解決這樣的問題,咱們引入了消息系統,好比我這臺機登陸事後,我將登陸的一些信息,經過遠程方式發送到另一臺機器上(或者同一臺機),讓它們去處理相應的後續邏輯實現;html

    目的是:一、用戶登陸更快,體驗上更好,java

                   二、只要保證登陸部分完整,即使後續出錯,並不影響用戶正常使用,即容錯性更強!算法

談到消息系統,首先想到的第一個問題確定會是:負載均衡

    消息的順序性dom

原本很想說一下關於消息順序性的一些問題,不過因爲我也是借鑑了一些其餘的帖子,以及官方的文檔,因此這裏就不會去贅述這些了,稍後我會分享一些很不錯的連接,留給本身之後看,也但願能夠給一些恰好要入門rocketmq的網友提供一些資料;分佈式

rocketmq是阿里雲的一套開源產品,功能什麼的就不贅述了,請自行去網站了解:https://help.aliyun.com/document_detail/29532.html?spm=5176.doc34411.6.104.EvZr21ide

rocketmq是一類二級消息分類的產品,一級爲topic,二級爲tag;網站

broker按照收到生產者發送的消息體,分析其中的topic,而後去找到相應的topic轉發出去,在消費端,消費者根據收到的消息分析出tag的不一樣去作不一樣的邏輯處理;this

那麼在這個時候,咱們就會好奇,爲了保證消息的順序執行的狀況,RokectMQ是如何選擇topic?爲此,咱們先看看rokcetmq的源代碼:阿里雲

// 官方例子以下:
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

        producer.start();

        for (int i = 0; i < 10000000; i++)
            try {
                {
                    Message msg = new Message("TopicTest",// topic
                            "TagA",// tag
                            "OrderID188",// key
                            ("Hello MetaQ").getBytes(RemotingHelper.DEFAULT_CHARSET));// body
                    SendResult sendResult = producer.send(msg); //發送消息
                    System.out.println(sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }
}
// defalutMQProducer 類下封裝的方法
    @Override
    public SendResult send(Message msg, MessageQueue mq) throws MQClientException, RemotingException,
            MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, mq);
    }

// send的實現方法
@Override
    public SendResult send(Message message) {
        this.checkONSProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl());
        com.alibaba.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message);

        try {
            com.alibaba.rocketmq.client.producer.SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ);

            message.setMsgID(sendResultRMQ.getMsgId());
            SendResult sendResult = new SendResult();
            sendResult.setTopic(sendResultRMQ.getMessageQueue().getTopic());//如何選擇topic
            sendResult.setMessageId(sendResultRMQ.getMsgId());
            return sendResult;
        } catch (Exception e) {
            log.error(String.format("Send message Exception, %s", message), e);
            throw checkProducerException(message.getTopic(), message.getMsgID(), e);
        }
    }

在官方例子中,咱們能夠看到,在發送消息的時候,咱們並無去了解細緻的發送消息時,那麼MQ究竟是如何選擇topic的?

可是能夠從代碼中看到,它確實有個MessageQueueSelector 接口,這個接口負責是選擇topic,那麼咱們就來看看它到底爲咱們提供了那些實現方法吧(通常的消息都是輪詢去尋找topic來實現負載均衡):

/**
     * 若是lastBrokerName不爲null,則尋找與其不一樣的MessageQueue(輪詢負載均衡)
     */
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName != null) {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();//輪詢
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }

            return null;
        }
        else {
            int index = this.sendWhichQueue.getAndIncrement();
            int pos = Math.abs(index) % this.messageQueueList.size();
            return this.messageQueueList.get(pos);
        }
    }

若是對於這種輪詢方式的負載均衡不滿意,並不能打達到咱們的需求,那麼咱們又改如何去選擇?

阿里雲提供了三種方式來解決咱們的需求,若是再不能知足,那麼就知道修改源碼算法部分來達到本身的要求了。
/**
 * 使用哈希算法來選擇隊列,順序消息一般都這樣作<br>
 * 
 * @author shijia.wxr<vintage.wang@gmail.com>
 * @since 2013-6-27
 */
public class SelectMessageQueueByHash implements MessageQueueSelector {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value = value % mqs.size();
        return mqs.get(value);
    }
}

/**
 * 根據機房來選擇發往哪一個隊列,支付寶邏輯機房使用
 * 
 * @author shijia.wxr<vintage.wang@gmail.com>
 * @since 2013-7-25
 */
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
    private Set<String> consumeridcs;


    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        // TODO Auto-generated method stub
        return null;
    }


    public Set<String> getConsumeridcs() {
        return consumeridcs;
    }


    public void setConsumeridcs(Set<String> consumeridcs) {
        this.consumeridcs = consumeridcs;
    }
}

/**
 * 發送消息,隨機選擇隊列
 * 
 * @author shijia.wxr<vintage.wang@gmail.com>
 * @since 2013-7-25
 */
public class SelectMessageQueueByRandoom implements MessageQueueSelector {
    private Random random = new Random(System.currentTimeMillis());


    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = random.nextInt();
        if (value < 0) {
            value = Math.abs(value);
        }

        value = value % mqs.size();
        return mqs.get(value);
    }
}

能夠看到,rocketmq爲咱們提供了三個選擇(除去輪詢方法),那麼若是咱們在很是關注消息順序的時候,咱們能夠選擇經過哈希算法求值的方式來實現

    SelectMessageQueueByHash

    咱們每一個傳遞進入的對象都會被哈希算法計算出 一個哈希值,好比咱們傳遞的是訂單號,那麼無疑咱們能夠保證相同的訂單號能夠傳遞給相同的topic去處理,那麼只要再保證是一致的tag就能夠保證順序的一致性啦;

    目的是:生產者 -- MQ服務端 -- 消費者 能夠達到一一對應的關係

第二種是機房選擇,算法是木有啦,應該是根據ip地址去區分,反正概念我不是很清晰,也沒有去注意和了解;有了解的親留個資料給我吧,連接就好,謝謝撒……

第三種是隨機選擇,也就是誰也不知道它到底會選擇誰,這種效率其實不好,沒有負載均衡,誰也不知道會不會堵塞起來,誰也不知道某個隊列是否已經塞滿。

有些問題,看起來很重要,但實際上咱們能夠經過合理的設計或者將問題分解來規避。若是硬要把時間花在解決它們身上,其實是浪費的,效率低下的。從這個角度來看消息的順序問題,咱們能夠得出兩個結論:

    一、不關注亂序的應用實際大量存在
    二、隊列無序並不意味着消息無序

參考連接以下:

http://www.jianshu.com/p/453c6e7ff81c

http://blog.csdn.net/column/details/learningrocketmq.html?&page=1

 

路走多了,早晚有人會翻車……

相關文章
相關標籤/搜索