storm之8:並行度


(一)storm拓撲的並行度能夠從如下4個維度進行設置:
一、node(服務器):指一個storm集羣中的supervisor服務器數量。
二、worker(jvm進程):指整個拓撲中worker進程的總數量,這些數量會隨機的平均分配到各個node。
三、executor(線程):指某個spout或者bolt的總線程數量,這些線程會被隨機平均的分配到各個worker。
四、task(spout/bolt實例):task是spout和bolt的實例,它們的nextTuple()和execute()方法會被executors線程調用。除非明確指定,storm會給每一個executor分配一個task。若是設置了多個task,即一個線程持有了多個spout/bolt實例.
注意:以上設置的都是總數量,這些數量會被平均分配到各自的宿主上,而不是設置每一個宿主進行多少個進程/線程。詳見下面的例子。


(二)並行度的設置方法
一、node:買機器吧,而後加入集羣中……
二、worker:Config#setNumWorkers() 或者配置項 TOPOLOGY_WORKERS
三、executor:Topology.setSpout()/.setBolt()
四、task:ComponentConfigurationDeclarer#setNumWorker()

(三)例子:
        // 三、建立topology
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);//設置executor數量爲5
        builder.setBolt("filter-bolt", new FilterBolt(), 3).shuffleGrouping(
                "kafka-reader");//設置executor數量爲3
        builder.setBolt("log-splitter", new LogSplitterBolt(), 3)
                .shuffleGrouping("filter-bolt");//設置executor數量爲5
        builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping(
                "log-splitter");//設置executor數量爲2

        // 四、啓動topology
        Config conf = new Config();
        conf.put(Config.NIMBUS_HOST, nimbusHost);
        conf.setNumWorkers(3);      //設置worker數量
        StormSubmitter.submitTopologyWithProgressBar(topologyName, conf,
                builder.createTopology());

一、經過config.setNumWorkers(3)將worker進程數量設置爲3,假設集羣中有3個node,則每一個node會運行一個worker。
二、executor的數量分別爲:
spout:5
filter-bolt:3
log-splitter:3
hdfs-bolt:2
總共爲13個executor,這13個executor會被隨機分配到各個worker中去。
注:這段代碼是從kafka中讀取消息源的,而這個topic在kafka中的分區數量設置爲5,所以這裏spout的線程婁爲5.
三、這個示例都沒有單獨設置task的數量,即便用每一個executor一個task的默認配置。若須要設置,能夠:
        builder.setBolt("log-splitter", new LogSplitterBolt(), 3)
                .shuffleGrouping("filter-bolt").setNumTasks(5);
來進行設置,這5個task會被分配到3個executor中。

(四)並行度的動態調整
對storm拓撲的並行度進行調整有2種方法:
一、kill topo—>修改代碼—>編譯—>提交拓撲
二、動態調整
第1種方法太不方便了,有時候topo不能說kill就kill,另外,若是加幾臺機器,難道要把全部topo kill掉還要修改代碼?
所以storm提供了動態調整的方法,動態調整有2種方法:
一、ui方式:進入某個topo的頁面,點擊rebalance便可,此時能夠看到topo的狀態是rebalancing。但此方法只是把進程、線程在各個機器上從新分配,即適用於增長機器,或者減小機器的情形,不能調整worker數量、executor數量等
二、cli方式:storm rebalance
舉個例子
storm rebalance toponame -n 7 -e filter-bolt=6 -e hdfs-bolt=8
將topo的worker數量設置爲7,並將filter-bolt與hdfs-bolt的executor數量分別設置爲六、8.
此時,查看topo的狀態是rebalancing,調整完成後,能夠看到3臺機器中的worker數量分別爲三、二、2

node

相關文章
相關標籤/搜索