MQTT研究之EMQ:【eclipse的paho之java客戶端使用注意事項】

這裏,簡單記錄一下本身在最近項目中遇到的paho的心得,這裏也涵蓋EMQX的問題。服務器

 

1. cleanSessionsession

這個標識,是確保client和server之間是否持久化狀態的一個標誌,不論是client仍是server重啓仍是鏈接斷掉。下面是來自paho客戶端源碼的註釋。併發

Sets whether the client and server should remember state across restarts and reconnects.
  • If set to false both the client and server will maintain state across restarts of the client, the server and the connection. As state is maintained:
    • Message delivery will be reliable meeting the specified QOS even if the client, server or connection are restarted.
    • The server will treat a subscription as durable.
  • If set to true the client and server will not maintain state across restarts of the client, the server or the connection. This means
    • Message delivery to the specified QOS cannot be maintained if the client, server or connection are restarted
    • The server will treat a subscription as non-durable

1)。 這個標誌位,設置爲true,那麼,當鏈接斷掉,例如,調用EMQX的接口踢掉鏈接,此時,即使重連上了(不管是經過autoconnect設置爲true,仍是在connectonLost這個回調函數裏面配置上重連的邏輯),MQTT客戶端程序都是沒法進行從新訂閱數據的。這個行爲,說明session裏面保存了會話所採用的topic信息。函數

2)。這個標誌位,設置爲true,autoconnect設置爲false,在connectLost這個回調函數裏面,自行實現從新鏈接的邏輯,而且再次針對相同的topic和qos進行訂閱的話,當鏈接被踢掉,這個時候,會從新鏈接上,而且也會訂閱上數據,只是會出現很奇怪的現象,CPU佔用率比鏈接斷開前提升不少。 個人應用(訂閱到數據後,對數據進行相應的邏輯處理,正常狀況下,一條數據大概1~5ms處理完)壓測環境下,鏈接未斷前,1.3W的併發,CPU空閒率在40%左右,重連以後,CPU的空閒率只有10%左右,這個地方是個大坑,目前我尚未搞清楚究竟是什麼緣由致使,如有人遇到相似問題同仁,請給我留言,告知可能的緣由。(個人paho是1.2.0版本,EMQX:V3.1.1)性能

3)。這個標誌位,設置爲false,autoconnect設置爲false,在connectLost這個回調函數裏面,自行實現重連的邏輯,可是不對topic進行從新訂閱,即使鏈接斷掉,從新鏈接上的話,依然會進行鏈接斷開以前的業務邏輯,訂閱到所需的數據,CPU的負荷也不會變大,基本和斷開以前的狀態持平。測試

 

下面配上connectLost這個回調函數(MqttCallback接口的一個方法)相關代碼:this

public void connectionLost(Throwable cause) {
        // 鏈接丟失後,通常在這裏面進行重連
        System.out.println(">>>>>>>>>>>>>>>" + cause.getMessage());
        System.out.println("鏈接斷開,能夠作重連");
        for (int i = 0; i < 3; i++) {
            if(reconnect()) {
                break;
            }else{
                try {
                    Thread.sleep(i * 2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private boolean reconnect() {
        try {
            mqttClient.connect(mqttConnectOptions);
            Thread.sleep(100);
            if( mqttClient.isConnected() ) {
                //mqttClient.subscribe(this.topic, 0);
                return true;
            }
        } catch (MqttException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return  false;
    }

 

2.  EMQX的承壓能力spa

網上標榜EMQX單節點處理能力多麼牛逼,100W鏈接毫無壓力,這個數值,其實呢,我以爲要仔細看測試場景,單單看鏈接數,其實沒有什麼意義,要看生產者消費者都存在的狀況下,還有數據流通這種場景,鏈接能力或者數據處理能力如何。 我不是說100W鏈接能力是虛構的,我是想說純粹的鏈接其實沒有多大的價值,由於EMQX是消息總線,只有鏈接,不存在數據流動,有多大意義呢?設計

仍是接着我上面的應用壓測,咱們團隊開發的一個規則引擎,1.6W的消息併發(4000設備,每一個設備每秒4條消息,固然是程序模擬出來的),規則引擎4C16G的服務器2臺,每臺跑3個實例,共享訂閱兩個EMQX節點(EMQX是集羣),EMQX服務器配置4C16G。結果跑不了多久時間(1個小時不到,有時半個小時),就會出現EMQX平凡踢掉消費者鏈接的狀況。rest

2019-09-19 14:22:42.710 [error] 0ba45c9872464c609c150f156e3f2a7e@10.95.198.25:52388 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:22:55.746 [error] 4014030aed1642bba6ecec85debed172@10.95.198.26:60404 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:23:08.131 [error] dde7f075ab2d45fdabfd192b5c6a4a30@10.95.198.25:52394 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:23:14.374 [error] 72a25aca01164c8c8b4cf48451c4e316@10.95.198.25:52456 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:23:41.686 [error] cd56963bfb4e4c0c8275abe9a24078de@10.95.198.26:60462 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:23:52.638 [error] 4014030aed1642bba6ecec85debed172@10.95.198.26:60514 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:24:06.015 [error] dde7f075ab2d45fdabfd192b5c6a4a30@10.95.198.25:52496 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:24:13.541 [error] 0ba45c9872464c609c150f156e3f2a7e@10.95.198.25:52474 [Connection] Shutdown exceptionally due to message_queue_too_long

針對這個問題,我諮詢過青雲的EMQ團隊的工程師,也在Github上諮詢過EMQX的維護者,都反饋說是消費者處理速度太慢,emq的消息隊列消息堆積致使。現象如此,怎麼解決呢,彷佛只能添加消費者服務,或者下降消息壓力,EMQX可否提高性能呢?我以爲EMQX如今共享訂閱的能力不行,就這4000個鏈接投遞消息,1.6W的併發,4000個topic,採用共享訂閱的方式,性能感受不是很好,是咱們程序設計的有問題,仍是EMQX共享訂閱性能真的有待提高?爲何這麼說能,咱們測試過非共享訂閱,就是明確訂閱某個指定topic。非共享訂閱狀況下,相同的服務器上,比共享訂閱性能好不少不少(差很少一半)。。。(歡迎探討)

 

從EMQX的配置中,針對上面這種消息隊列太長的問題,emqx.conf的配置文件中有相關信息,參考下面這個錯誤找到了相關的配置參數,EMQX的官方參數解釋或者支持真心跟不上,沒有國外開源組織社區營造的好,這個須要努力。

2019-09-19 14:22:30.362 [error] f2ac199b0314449d822e150c8d51de93 crasher:
    initial call: emqx_session:init/1
    pid: <0.20141.1>
    registered_name: []
    exception exit: killed
      in function  emqx_session:handle_info/2 (src/emqx_session.erl, line 641)
      in call from gen_server:try_dispatch/4 (gen_server.erl, line 637)
      in call from gen_server:handle_msg/6 (gen_server.erl, line 711)
    ancestors: [emqx_session_sup,emqx_sm_sup,emqx_sup,<0.1386.0>]
    message_queue_len: 0
    messages: []
    links: [<0.1577.0>]
    dictionary: [{force_shutdown_policy,
                      #{max_heap_size => 838860800,message_queue_len => 8000}},
                  {deliver_stats,676193},
                  {'$logger_metadata$',
                      #{client_id => <<"f2ac199b0314449d822e150c8d51de93">>}}]
    trap_exit: true
    status: running
    heap_size: 6772
    stack_size: 27
    reductions: 69920965
  neighbours:

再看看emqx.conf的配置文件中,和這個queue相關的配置:

## Max message queue length and total heap size to force shutdown
## connection/session process.
## Message queue here is the Erlang process mailbox, but not the number
## of queued MQTT messages of QoS 1 and 2.
##
## Numbers delimited by `|'. Zero or negative is to disable.
##
## Default:
##   - 8000|800MB on ARCH_64 system
##   - 1000|100MB on ARCH_32 sytem
## zone.external.force_shutdown_policy = 8000|800MB

有人會說,你能夠將這裏的消息數量調大點啊,沒錯,這個調一下是能夠改善,可是不能根治問題,本身想一想吧,大點最多也就是對消息速率波動的韌性加大了,可是不能解決持續生成高於所謂的消費慢這種狀況下的問題。 EMQ方說辭其實,在咱們的這個場景下,我是不那麼認同的,爲何這麼說呢, 個人規則引擎消費日誌裏面顯示,每條消息處理的時間並無變長,CPU的忙碌程度並無惡化, 添加共享訂閱實例變多,EMQX性能降低了,我以爲EMQX在共享訂閱變多的狀況下,對消費者端投遞消息的速率或者效率降低了,可是呢,EMQX這個broker從消息生產者這邊接收消息的能力沒有改變,致使EMQX的消息隊列消息積壓,最終出現踢鏈接的policy得以執行。。。

 

還有一個問題,不知道細心的讀者有沒有發現,消費者這邊消息消費的好好的,消息積壓了,EMQX爲什麼要把消費者的鏈接給踢掉呢,爲什麼不是將生產者的鏈接踢掉呢?這個邏輯我以爲有點不是很好理解,原本消息就積壓了,是否是要加快消費才能緩解或者解除消息積壓的問題?讀者大家是如何理解的,也能夠留言探討!

相關文章
相關標籤/搜索