阿里雲RocketMQ的性能測試

因業務須要,須要進行阿里雲RocketMQ的性能測試。

環境,一臺windows系統CPU:I7,內存:8G,64位操做系統。

測試兩種場景,爲了保證訂閱關係一致性(能夠去阿里雲官網瞭解訂閱關係一致性),消費分爲兩種模式。

一、按Tag訂閱,訂閱全部Tag,測試使用3個消費者,3個生產者,每一個生產者發送一萬條數據,放到同一個Tag裏,對應同一個ShardingKey,保證順序消費。

測試結果:

阿里雲RocketMQ的性能測試

二、按Tag訂閱,每一個消費者訂閱一個Tag,每一個Tag在一個分組裏面。

測試結果:

阿里雲RocketMQ的性能測試

其中要特別注意第二條,訂閱關係一致性問題,若是3個消費者在一個組內,訂閱的tag不一致,消費是有問題的,也可能就不消費。windows

結論:

TPS不到100,這個基於本地到阿里雲,走的公網,效果通常。服務器

基於阿里雲內網的測試

阿里雲RocketMQ的性能測試

發送和消費一萬條數據大概都是22秒左右,TPS大概500。markdown

發送消息代碼:

public static void sendMqMessage( String topic, String tag, String message, String sharding ) {

        String key = UUID.randomUUID().toString();
        Message msg = new Message(
                // Message 所屬的 Topic
                topic,
                // Message Tag, 可理解爲 Gmail 中的標籤,對消息進行再歸類,方便 Consumer 指定過濾條件在消息隊列 RocketMQ 的服務器過濾
                tag,
                // Message Body 能夠是任何二進制形式的數據, 消息隊列 RocketMQ 不作任何干預,須要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
                message.getBytes()
        );
        // 設置表明消息的業務關鍵屬性,請儘量全局惟一。
        // 以方便您在沒法正常收到消息狀況下,可經過控制檯查詢消息並補發。
        // 注意:不設置也不會影響消息正常收發
        msg.setKey(key);
        // 分區順序消息中區分不一樣分區的關鍵字段,sharding key 於普通消息的 key 是徹底不一樣的概念。
        // 全局順序消息,該字段能夠設置爲任意非空字符串。
        String shardingKey = sharding;
        try {
            SendResult sendResult = producer.send(msg, shardingKey);
            // 發送消息,只要不拋異常就是成功
            if (sendResult != null) {
                //System.out.println(message + tag + sharding);
                if(message.equals("10000")){
                    System.out.println(tag + ":發送完畢10000!");
                }
                //SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                //System.out.println(dateFormat.format(new Date()) + "-發送消息成功-sharding:" + shardingKey + ",tag:" + tag + ",key:"+ key + ",msgId:" + sendResult.getMessageId());
            }
        }
        catch (Exception e) {
            // 消息發送失敗,須要進行重試處理,可從新發送這條消息或持久化這條數據進行補償處理
            System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
            throw e;
        }
    }

消費代碼:

public void consumerMqMessage() {
        String topic = "topic-test";
        String tags = "W3";//第一種模式使用*
        consumer = RocketMqConsumerSingleton.getInstance();
        // 在訂閱消息前,必須調用 start 方法來啓動 Consumer,只需調用一次便可。
        consumer.subscribe(
            // Message 所屬的 Topic
            topic,
            // 訂閱指定 Topic 下的 Tags:
            // 1. * 表示訂閱全部消息
            // 2. TagA || TagB || TagC 表示訂閱 TagA 或 TagB 或 TagC 的消息
            tags,
            new MessageOrderListener() {
                /**
                 * 1. 消息消費處理失敗或者處理出現異常,返回 OrderAction.Suspend<br>
                 * 2. 消息處理成功,返回 OrderAction.Success
                 */
                @Override
                public OrderAction consume(Message message, ConsumeOrderContext context) {
                    String msg = new String(message.getBody());
                    //System.out.println(msg);

                    if(msg.equals("1")) {
                        System.out.println(message.getTag() + "開始:" + System.currentTimeMillis());
                    }
                    if(msg.equals("10000")) {
                        System.out.println(message.getTag() + "結束:" + System.currentTimeMillis());
                    }
                    //SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    //System.out.println(dateFormat.format(new Date()) + "-消費消息---sharding:" + message.getShardingKey() + ",tag:" + message.getTag() + ",key: " + message.getKey() + ",MsgId:" + message.getMsgID());
                    try {
                        //Thread.sleep(2000);
                        //System.out.println("-------------消費者睡2秒後----------");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return OrderAction.Success;
                }
            });
        consumer.start();
    }
相關文章
相關標籤/搜索