:性能優化的第一步就是找到瓶頸在哪裏,從瓶頸處入手,解決關鍵點問題,事半功倍。html
除了經過系統命令查看CPU使用,jstack查看堆棧的調用狀況之外,還能夠經過Storm自身提供的信息來對性能作出相應的判斷。java
在Storm 的UI中,對運行的topology提供了相應的統計信息node
三個重要參數:git
·Execute latency:消息(tuple)的平均處理時間,單位是毫秒。github
·Process latency:消息從收到到被ack掉所花費的時間,單位爲毫秒。若是沒有啓用Acker機制,那麼Process latency的值爲0。web
·Capacity:計算公式爲Capacity = Spout 或者 Bolt 調用 execute 方法處理的消息數量 × 消息平均執行時間/時間區間。若是這個值越接近1,說明Spout或者Bolt基本一直在調用 execute 方法,所以並行度不夠,須要擴展這個組件的 Executor數量。算法
////////////////////////////////////////////////////////////////////////////////性能優化
1、網絡
Storm能夠很容易地在集羣中橫向拓展它的計算能力,它會把整個運算過程分割成多個獨立的tasks在集羣中進行並行計算。在Storm中,一個task就是運行在集羣中的一個Spout或Bolt實例。 多線程
Topology的運行涉及到四種組件:
Node(machines):集羣中的節點,就是這些節點一塊兒工做來執行Topology。
Worker(JVMs):一個worker就是一個獨立的JVM進程。每一個節點均可以經過配置運行一個或多個worker,一個Topology能夠指定由多少個worker來執行。
Executors(threads):一個worker JVM中運行的線程。一個worker進程能夠執行一個或多個executor線程。一個Executor能夠運行一個「組件」內的多個tasks,Storm默認一個每一個executor分配一個task。
Task(bolt/spout實例):Tasks就是spouts和bolts的實例,它具體是被executor線程處理的。
2、
Storm實例:wordcount
Topology默認執行狀況以下: 一個節點會爲Topology分配一個worker,這個worker會爲每一個Task啓一個executor。
2.1 爲Topology增長worker
兩種途徑增長workers:經過程序設置或storm rebalance命令。
Config config = new Config();
config.setNumWorkers(2);
注意:在LocalMode下無論設置幾個worker,最終都只有一個worker進程。
2.2 配置executors和tasks
task是spout和bolt的實例,一個executor線程處理多個task,task是真正處理具體數據的一個過程。Task的數量在整個topology運行期間通常是不變的,可是組件的Executor是有可能發生變化的,即有:thread<=task。
2.2.1 設置executor(thread)數量
每一個組件產生多少個Executor?在程序中設置或storm rebalance命令
builder.setSpout(SENTENCE_SPOUT_ID,spout, 2);
2.2.2 設置task的數量
每一個組件建立多少個task?在程序中設置或storm rebalance命令
builder.setBolt(SPLIT_BOLT_ID,splitBolt, 2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);
builder.setBolt(COUNT_BOLT_ID, countBolt, 4).fieldsGrouping(SPLIT_BOLT_ID,newFields("word"));
若是一開始分配2個workers,則Topology的運行狀況以下:
3、
一個實際topology的全景,topology由三個組件組成,
一個Spout:BlueSpout
兩個Bolt:GreenBolt、YellowBolt。
如上圖,咱們配置了兩個worker進程,兩個BlueSpout線程,兩個GreenBolt線程和六個YellowBolt線程,那麼分佈到集羣中的話,每一個工做進程都會有5個executor線程。具體代碼:
Config conf = new Config(); conf.setNumWorkers(2); // use two worker processes
topologyBuilder.setSpout(「blue-spout」, new BlueSpout(), 2); // set parallelism hint to 2
topologyBuilder.setBolt(「green-bolt」, new GreenBolt(), 2) .setNumTasks(4) .shuffleGrouping(「blue-spout」);
topologyBuilder.setBolt(「yellow-bolt」, new YellowBolt(), 6) .shuffleGrouping(「green-bolt」);
StormSubmitter.submitTopology( 「mytopology」, conf, topologyBuilder.createTopology() );
Storm中也有一個參數來控制topology的並行數量: TOPOLOGY_MAX_TASK_PARALLELISM: 這個參數能夠控制一個組件上Executor的最大數量。它一般用來在本地模式測試topology的最大線程數量。固然咱們也能夠在代碼中設置:
config.setMaxTaskParallelism().
4、
如何改變一個運行topology中的Parallelism
Storm中一個很好的特性就是能夠在topology運行期間動態調製worker進程或Executor線程的數量而不須要重啓topology。這種機制被稱做rebalancing。 咱們有兩種方式來均衡一個topology:
1:經過Storm web UI
2:經過storm rebalance命令
$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
DefaultScheduler:默認調度算法,採用輪詢的方式將系統中的可用資源均勻地分配給topology,但也不是絕對均勻。會先將其它topology不須要的資源從新收集起來。EventScheduler:和DefaultScheduler差很少,不會先將其它topology不須要的資源從新收集起來。
IsolationScheduler:用戶可定義topology的機器資源,storm分配的時候會優先分配這些機器,以保證分配給該topology的機器只爲這一個topology服務。
DefaultScheduler:
1:調用cluster的needsSchedualerTopologies方法得到須要進行任務分配的topologies
開始分別對每個topology進行處理
2:調用cluster的getAvailableSlots方法得到當前集羣可用的資源,以<node,port>集合的形式返回,賦值給available-slots
3:得到當前topology的executor信息並轉化爲<start-t ask-id,end-task-id>集合存入all-executors,根據topology計算executors信息,採用compute-executors算法。
4:調用DefaultScheduler的get-alive-assigned-node+port->executors方法得到該topology已經得到的資源,返回<node+port,executor>集合的形式存入alive-assigned
5:調用slot-can-reassign對alive-assigned中的slots信息進行判斷,選出其中能被從新分配的slot存入變量can-reassigned。這樣可用的資源就由available-slots和can-reassigned兩部分組成。
6:計算當前topology能使用的所有slot數目total-slots--to-use:min(topology的NumWorker數,available-slots+can-reassigned),若是total-slots--to-use>當前已分配的slots數目,則調用bad-slots方法計算可被釋放的slot
7:調用cluster的freeSlots方法釋放計算出來的bad-slot
8:最後調用EventScheduler的schedule-topologies-evenly進行分配
:先計算集羣中可供分配的slot資源,並判斷當前已分配給運行Topology的slot是否須要從新分配,而後對可分配的slot進行排序,再計算Topology的executor信息,最後將資源平均地分配給Topology。
接下來咱們提交3個topology
Topology |
Worker數 |
Executer數 |
Task數 |
T-1 |
3 |
8 |
16 |
T-2 |
5 |
10 |
10 |
T-3 |
3 |
5 |
10 |
1、提交T-1
1:計算slots。sort-slots算法對可用slots進行處理,結果爲{[s1 6700] [s2 6700] [s3 6700] [s4 6700] [s1 6701] [s2 6701] [s3 6701] [s4 6701] [s1 6702] [s2 6702] [s3 6702] [s4 6702] [s1 6703] [s2 6703] [s3 6703] [s4 6703]}
2:計算executor。compute-executors算法計算後獲得的Executor列表爲:{[1 2] [3 4] [5 6] [7 8] [9 10] [11 12] [13 14] [15 16]};注:格式爲[start-task-id end-task-id],共8個executor,第一個包含2個task,start-task-id爲1,end-task-id爲2,因此記爲[1 2],後面依次類推
3:計算worker。8個Executor在3個worker上的分佈狀態爲[3,3,2]
分配結果爲:
{[1 2] [3 4] [5 6]} -> [s1 6700]
{[7 8] [9 10] [11 12]} -> [s2 6700]
{[13 14] [15 16]} -> [s3 6700]
分配後集羣狀態爲:
2、提交T-2、T-3
分配後集羣狀態爲:
出現負載不均衡現象。
//////////////////////////////////////////////////////////////////
Jstorm(Jstorm介紹:http://wenku.baidu.com/view/59e81017dd36a32d7375818b.html)是阿里團隊對Storm使用純Java語言進行的重寫,基本內核思想和Storm沒有區別,架構以下,加了一些本身的優化。
圖1 Jstorm架構圖
Jstorm早期版本(0.9.5以前)宣稱支持從CPU、Memory、Disk以及Net四個緯度對資源進行分配和調度,而且任務分配粒度細到Task級別,可是新版本(本次分析基於最新發布版本0.9.6.2)緊支持CPU和Memory維度的任務分配,而且任務分配粒度也只到Worker級別,以前的相關API已經不推薦使用。
Jstorm的做者longda確認,Jstorm新版本確實刪除掉了以前繁雜的資源分配機制,目前支持CPU和Memory維度的的資源分配,而且資源分配的粒度也只是到Worker級別。
2 Jstorm調度機制
Jstorm沒有像Storm那樣提供可插拔的任務分配器,它實現了Storm的默認調度算法,對默認調度算法進行了優化和擴展,而且在此基礎上提供了豐富的調度定製化接口,用戶能夠方便的設置相應調度策略。
2.1 Jstorm的默認調度算法
Jstorm總體上繼承了Storm的默認調度算法,保證Topology平均的分配在集羣上,具體以下:
以Worker爲維度,儘可能將Worker平均分配到各個Supervisor上。
以Worker爲單位,確認Worker與Task數目大體的對應關係。
創建Task-Worker的關係,創建關係的優先級爲:儘可能避免同類Task在同一Work和Supervisor下的狀況,儘可能保證Task在Worker和Supervisor基準上平均分配,儘可能保證有直接信息流傳輸的Task在同一Worker中。
2.2 Jstorm的調度定製化接口
從Jstorm 0.9.0 開始, JStorm 提供很是強大的調度功能, 基本上能夠知足大部分的需求(官方所言)。
Jstorm從0.9.5版本以後,提供了以下調度定製化接口:
2.2.1 設置每一個Woker的默認內存大小
Jstorm提供以下接口來設置每一個Worker佔用的內存大小:
ConfigExtension.setMemSizePerWorker (Map conf, long memSize)
ConfigExtension.setMemSizePerWorkerByKB(Map conf, long memSize)
ConfigExtension.setMemSizePerWorkerByMB(Map conf, long memSize)
ConfigExtension.setMemSizePerWorkerByGB(Map conf, long memSize)
2.2.2 設置每一個Worker的memory,cpu權重
Jstorm提供以下接口來設置每一個Worker的cgroup,cpu權重
ConfigExtension.setCpuSlotNumPerWorker(Map conf, int slotNum)
2.2.3 設置是否使用舊的分配方式
Jstorm提供以下接口來設置是否使用舊的分配方式
ConfigExtension.setUseOldAssignment(Map conf, boolean useOld)
2.2.4 設置強制某個Component的Task運行在不一樣的節點上
Jstorm能夠強制某個component的task 運行在不一樣的節點上,接口以下:
ConfigExtension.setTaskOnDifferentNode(Map componentConf, boolean isIsolate)
注意:這個配置componentConf是component的配置, 須要執行addConfigurations 加入到spout或bolt的configuration當中
2.2.5 自定義Worker分配
自定義Worker分配的示例以下:
WorkerAssignment worker = new WorkerAssignment();
worker.addComponent(String compenentName, Integer num);//在這個worker上增長task
worker.setHostName(String hostName);//強制這個worker在某臺機器上
worker.setJvm(String jvm);//設置這個worker的jvm參數
worker.setMem(long mem); //設置這個worker的內存大小
worker.setCpu(int slotNum); //設置cpu的權重大小
ConfigExtension.setUserDefineAssignment(Map conf, List<WorkerAssignment> userDefines)
注:每個worker的參數並不須要被所有設置,worker屬性在合法的前提下即便只設置了部分參數也仍會生效。
Jstorm和Storm對比:
1.穩定性
均勻的將每一個組件(spout/bolt)的線程(並行度)分配到集羣中的各個節點。Jstorm會盡量的將同一個組件的線程分配到不一樣的節點及worker上以減小同質競爭(同一個組件線程作的是同樣的事情,好比可能都是cup密集型,那麼放到不一樣節點就能提供效率,更好的利用資源)。
舉個例子,一個集羣有三個節點,node-A有3個worker,node-B有2個worker,node-C有一個worker。當用戶提交一個topology(該topology須要4個worker,1個spout(X),一個bolt(Y),spout/bolt各佔2個線程)。初始時:在Storm與Jstorm是同樣的。
這時,若是node-C掛掉了,那麼node-C中的worker必需要重寫分配。若是是Storm的默認分配記過以下:
若是是Jstorm的默認調度來進行分配的化,結果以下:
顯然,JStorm的默認調度算法比Storm的更加優秀。
2.負載均衡
Jstorm儘可能保證每一個worker所分得的線程數基本一致,而且worker在各個supervisors之間也儘可能分配的均勻。例如,一個集羣有3個節點,node-A有3個worker,noder-B有3個woker,node-C與3個woker。用戶先提交了一個須要2個woker的topology,而後,又提交了一個須要4個worker的topology。
若是是Storm的默認調度算法來分配這兩個topology,結果以下:
顯然能夠看出,這個分配是不均勻的。。而Jstorm的默認分配就能獲得一個均勻的結果:
3.性能
Jstorm會試圖將兩個須要通信的線程儘可能放在一個worker中來減小網絡的傳輸。例如:一個集羣中有2個節點,node-A有2個worker,node-B有2個worker。當用戶提交一個topology(須要2個worker,1個spout(X),2個bolt(Y、Z),三個組件各一個線程)。整個topology的數據流爲X->Y->Z。若是Storm的默認調度算法來分配,可能的結果爲:
顯然中間須要網絡間傳輸,而JStorm的分配就能避免這個問題:
這裏Y與Z的通信是進程間通信。在進程間通信,消息不須要序列與反序列化。這樣會極大的提升效率。
想要(穩定性/性能/平衡)都同時知足是很困難的。Jstorm對於重要性排序是:穩定性>性能>負債均衡。
////////////////////////////////////////////////////////////////////////////////////////////
JStorm相比Storm調度更強大
1完全解決了storm 任務分配不均衡問題
2從4個維度進行任務分配:CPU、Memory(Disk、Net)
3默認一個task,一個cpu slot。當task消耗更多的cpu時,能夠申請更多cpu slot
:解決新上線的任務去搶佔老任務的cpu
:一淘有些task內部起不少線程,單task消耗太多cpu
4默認一個task,一個memory slot。當task須要更多內存時,能夠申請更多內存slot
先海狗項目中,slot task 須要8G內存,並且其餘任務2G內存就夠了
5默認task,不申請disk slot。當task 磁盤IO較重時,能夠申請disk slot
:海狗/實時同步項目中,task有較重的本地磁盤讀寫操做
6能夠強制topology運行在單獨一個節點上
:節省網絡帶寬
:Tlog中大量小topology,爲了減小網絡開銷,強制任務分配到一個節點上
7能夠強制某個component的task 運行在不一樣的節點上
聚石塔,海狗項目,某些task提供web Service服務,爲了端口不衝突,所以必須強制這些task運行在不一樣節點上
8能夠自定義任務分配:提早預定任務分配到哪臺機器上,哪一個端口,多少個cpu slot,多少內存,是否申請磁盤
:海狗項目中,部分task指望分配到某些節點上
9能夠預定上一次成功運行時的任務分配:上次task分配了什麼資源,此次仍是使用這些資源
:CDO不少任務期待重啓後,仍使用老的節點,端口
//////////////////////////////////////////////////////////////////////
storm的基礎框架以下:
Nimbus是主節點維護的一個守護進程,用於分配代碼、佈置任務及故障檢測。每一個工做節點都運行了一個名爲「Supervisor」的守護進程,用於監聽工做,開始並終止工做進程。Nimbus和Supervisor的協調工做是由Zookeeper來完成的。Zookeeper用於管理集羣中的不同組件,ZeroMQ是內部消息系統(netty)。
改進是在調度方面。參考思路:因爲storm的調度是平均分配的,所以在offline狀況下能夠根據節點之間是否連通、找出相似於最短路徑,從而動態調整拓撲圖,以改進調度。另外一方面,在online狀況下,能夠根據節點的負載狀況,當負載量大於某個門限值時,認爲到該節點不可達,從新選擇路徑,能夠考慮以節點的負載量做爲其餘幾點到該節點的距離,從而根據可達性等相似指標,使用相似於最短路徑的算法,動態的調整拓撲結構,從而改進調度效率。
////////////////////////////////////////////////////////////////////////
任務調度接口定義:
1 |
IScheduler{ |
2 |
// conf爲當前nimbus的storm配置 |
3 |
void prepare(Map conf); // 初始化 |
4 |
// topologyies表示集羣中全部topology信息,cluster表示當前集羣包括用戶自定義調度邏輯事所需的 全部資源(Slot、Supervisor、以及任務分配狀況)。 |
5 |
void schedule(Topologies topologies,Cluster cluster); |
6 |
}; |
Storm調度的相關術語
1、slot。這表明一個Supervisor節點上的一個單位資源。每一個slot對應一個port,一個slot只能被一個Worker佔用。
2、Worker,Executor.Task,1個Worker包含1個或多個Executor,每一個Executor包含1個或多個Task。
3、Executor的表現形式爲[1-1],[2-2],中括號內的數字表明該Executor中的起始Task id到末尾Task id,1個Worker就至關於在外面加個大括號{[1-1],[2-2]}
4.Component。Storm中的每一個組件就是指一類Spout或1個類型的Bolt。
EventScheduler
實現流程圖:
功能:對資源進行均勻分配的調度器,實現了IScheduler接口, schedule方法實現以下
1 |
defn– schedule[this ^Topologies topologyies ^Cluster cluster] |
2 |
(schedule-topologies-evenly topologies cluster) |
schedule-topologies-evenly方法原型:
1 |
defn schedule-topologies-evenly[^Topologies topologies ^Cluster cluster] |
方法說明:
調用cluster對象的needsSchedulingTopology方法獲取須要進行任務調度的Topology集合,判讀依據:Topology設置的NumWorkers數目是否大於已經分配給該Topology的Worker數目,以及該Topology還沒有分配的Executor數目是否大於0.
對須要進行任務調度的Topology獲取其topology-id,而後調用schedule-topology方法獲取到new-assignment(<executor,node+port>集合)。
用node和port信息構造WorkerSlot對象並將做爲slot.
對Executor集合中的每一項構造ExecutorDetail對象,並返回一個ExecutorDetails集合。
調用cluster的assign方法將計算出來的slot分配給與該Topology相對應的executors.
schedule-topology
方法原型:
1 |
defn- schedule-topology [^TopologyDetails topology ~Cluster cluster] |
方法說明:
調用cluster的getAvailableSlots方法獲取當前集羣可用的slot資源(集羣中還沒使用的Supervisor端口),並轉換爲<node,port>集合(available-slots).
將topology中的ExecutorDetails集合轉換爲<start-task-id,end-task-id>集合。
調用get-alive-assigned-node+port->executors方法獲取當前topology已經分配的資源狀況,返回<node+port,executors>集合(alive-assigned)。
獲取當前topology可使用的slot數目,topology設置的worker數目與當前available-slots數目加上alive-assigned數據兩者的最小值(total-slots-to-use)。
對available-slots進行排序,計算須要分配的solt數目(total-slots-to-use減去alive-assigned),從排序後的solt中順序獲取須要分配的solt作爲reassign-solts.
比較all-executors跟已分配的Executor集合間的差別,獲取須要進行分配的Executor集合,作爲reassign-executors.
將計算出來的reassign-solts與reassign-executor進行關聯,轉換爲<executor,slot>映射集合(映射方式爲:使executor均勻的分佈在slot上),保存到ressignment中.
///////////////////////////////////////////////////////////////
下面是調度器的核心實現。
import backtype.storm.scheduler.*;
import clojure.lang.PersistentArrayMap;
import java.util.*;
/**
* 直接分配調度器,能夠分配組件(spout、bolt)到指定節點中
* Created by zhexuan on 15/7/6.
*/
public class DirectScheduler implements IScheduler{
@Override
public void prepare(Map conf) {
}
@Override
public void schedule(Topologies topologies, Cluster cluster) {
System.out.println("DirectScheduler: begin scheduling");
// Gets the topology which we want to schedule
Collection<TopologyDetails> topologyDetailes;
TopologyDetails topology;
//做業是否要指定分配的標識
String assignedFlag;
Map map;
Iterator<String> iterator = null;
topologyDetailes = topologies.getTopologies();
for(TopologyDetails td: topologyDetailes){
map = td.getConf();
assignedFlag = (String)map.get("assigned_flag");
//如何找到的拓撲邏輯的分配標爲1則表明是要分配的,不然走系統的調度
if(assignedFlag != null && assignedFlag.equals("1")){
System.out.println("finding topology named " + td.getName());
topologyAssign(cluster, td, map);
}else {
System.out.println("topology assigned is null");
}
}
//其他的任務由系統自帶的調度器執行
new EvenScheduler().schedule(topologies, cluster);
}
/**
* 將組件(spout、bolt)分配到指定節點
*/
private void topologyAssign(Cluster cluster, TopologyDetails topology, Map map){
Set<String> keys;
PersistentArrayMap designMap;
Iterator<String> iterator;
iterator = null;
// make sure the special topology is submitted,
if (topology != null) {
designMap = (PersistentArrayMap)map.get("design_map");
if(designMap != null){
System.out.println("design map size is " + designMap.size());
keys = designMap.keySet();
iterator = keys.iterator();
System.out.println("keys size is " + keys.size());
}
if(designMap == null || designMap.size() == 0){
System.out.println("design map is null");
}
boolean needsScheduling = cluster.needsScheduling(topology);
if (!needsScheduling) {
System.out.println("Our special topology does not need scheduling.");
} else {
System.out.println("Our special topology needs scheduling.");
// find out all the needs-scheduling components of this topology
Map<String, List<ExecutorDetails>> componentToExecutors = cluster.getNeedsSchedulingComponentToExecutors(topology);
System.out.println("needs scheduling(component->executor): " + componentToExecutors);
System.out.println("needs scheduling(executor->components): " + cluster.getNeedsSchedulingExecutorToComponents(topology));
SchedulerAssignment currentAssignment = cluster.getAssignmentById(topology.getId());
if (currentAssignment != null) {
System.out.println("current assignments: " + currentAssignment.getExecutorToSlot());
} else {
System.out.println("current assignments: {}");
}
String componentName;
String nodeName;
if(designMap != null && iterator != null){
while (iterator.hasNext()){
componentName = iterator.next();
nodeName = (String)designMap.get(componentName);
System.out.println("如今進行調度 組件名稱->節點名稱:" + componentName + "->" + nodeName);
componentAssign(cluster, topology, componentToExecutors, componentName, nodeName);
}
}
}
}
}
/**
* 組件調度
* @param cluster
* 集羣的信息
* @param topology
* 待調度的拓撲細節信息
* @param totalExecutors
* 組件的執行器
* @param componentName
* 組件的名稱
* @param supervisorName
* 節點的名稱
*/
private void componentAssign(Cluster cluster, TopologyDetails topology, Map<String, List<ExecutorDetails>> totalExecutors, String componentName, String supervisorName){
if (!totalExecutors.containsKey(componentName)) {
System.out.println("Our special-spout does not need scheduling.");
} else {
System.out.println("Our special-spout needs scheduling.");
List<ExecutorDetails> executors = totalExecutors.get(componentName);
// find out the our "special-supervisor" from the supervisor metadata
Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
SupervisorDetails specialSupervisor = null;
for (SupervisorDetails supervisor : supervisors) {
Map meta = (Map) supervisor.getSchedulerMeta();
if(meta != null && meta.get("name") != null){
System.out.println("supervisor name:" + meta.get("name"));
if (meta.get("name").equals(supervisorName)) {
System.out.println("Supervisor finding");
specialSupervisor = supervisor;
break;
}
}else {
System.out.println("Supervisor meta null");
}
}
// found the special supervisor
if (specialSupervisor != null) {
System.out.println("Found the special-supervisor");
List<WorkerSlot> availableSlots = cluster.getAvailableSlots(specialSupervisor);
// 若是目標節點上已經沒有空閒的slot,則進行強制釋放
if (availableSlots.isEmpty() && !executors.isEmpty()) {
for (Integer port : cluster.getUsedPorts(specialSupervisor)) {
cluster.freeSlot(new WorkerSlot(specialSupervisor.getId(), port));
}
}
// 從新獲取可用的slot
availableSlots = cluster.getAvailableSlots(specialSupervisor);
// 選取節點上第一個slot,進行分配
cluster.assign(availableSlots.get(0), topology.getId(), executors);
System.out.println("We assigned executors:" + executors + " to slot: [" + availableSlots.get(0).getNodeId() + ", " + availableSlots.get(0).getPort() + "]");
} else {
System.out.println("There is no supervisor find!!!");
}
}
}
}
Storm自定義實現直接分配調度器,代碼修改自Twitter Storm核心貢獻者徐明明
在準備開發Storm自定義以前,事先已經瞭解了下現有Storm使用的調度器,默認是DefaultScheduler,調度原理大致以下:
* 在新的調度開始以前,先掃描一遍集羣,若是有未釋放掉的slot,則先進行釋放
* 而後優先選擇supervisor節點中有空閒的slot,進行分配,以達到最終平均分配資源的目標
現有scheduler的不足之處,上述的調度器基本能夠知足通常要求,可是針對下面個例仍是沒法知足:
* 讓spout分配到固定的機器上去,由於所需的數據就在那上面
* 不想讓2個Topology運行在同一機器上,由於這2個Topology都很耗CPU
DirectScheduler把劃分單位縮小到組件級別,1個Spout和1個Bolt能夠指定到某個節點上運行,若是沒有指定,仍是按照系統自帶的調度器進行調度.這個配置在Topology提交的Conf配置中可配.
打包此項目,將jar包拷貝到STORM_HOME/lib目錄下
在nimbus節點的storm.yaml配置中,進行以下的配置:
storm.scheduler: "storm.DirectScheduler"
而後是在supervisor的節點中進行名稱的配置,配置項以下:
supervisor.scheduler.meta:
name: "your-supervisor-name"
在集羣這部分的配置就結束了,而後重啓nimbus,supervisor節點便可,集羣配置只要1次配置便可.
int numOfParallel;
TopologyBuilder builder;
StormTopology stormTopology;
Config config;
//待分配的組件名稱與節點名稱的映射關係
HashMap<String, String> component2Node;
//任務並行化數設爲10個
numOfParallel = 2;
builder = new TopologyBuilder();
String desSpout = "my_spout";
String desBolt = "my_bolt";
//設置spout數據源
builder.setSpout(desSpout, new TestSpout(), numOfParallel);
builder.setBolt(desBolt, new TestBolt(), numOfParallel)
.shuffleGrouping(desSpout);
config = new Config();
config.setNumWorkers(numOfParallel);
config.setMaxSpoutPending(65536);
config.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 40000);
config.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 40000);
component2Node = new HashMap<>();
component2Node.put(desSpout, "special-supervisor1");
component2Node.put(desBolt, "special-supervisor2");
//此標識表明topology須要被調度
config.put("assigned_flag", "1");
//具體的組件節點對信息
config.put("design_map", component2Node);
StormSubmitter.submitTopology("test", config, builder.createTopology());
https://github.com/linyiqun/storm-scheduler
////////////////////////////////////////////////////////////////////////////////////////////////
Storm中nimbus負責Topology分配,主要兩階段:
1. 邏輯分配階段
這裏又會涉及到兩個概念executor和task,簡單講對於一個具體的component來講,task就是component在運行時的實例個數,即component靜態的class代碼,task是運行時的具體object對象,task的個數便是component在runtime時被實例化的對象個數,
而executor能夠理解爲線程的概念,一個component對應的executor個數就是component運行時所獨佔的線程數,舉例來說,某個component的task個數是6,executor個數是2,則運行時component就有6個實例運行在2個線程中,一個線程負責執行3個task,默認狀況下通常會將task個數配置爲executor的個數,即每個線程只負責執行一個component的實例化對象。
:邏輯階段所做的工做就是計算Topology中全部的component的executor個數,task個數,而後將全部的task分配到executor中。
2. 物理分配階段
executor表明的是線程,具體要落地執行還須要依附於進程,所以物理分配階段作的工做就是將全部的executor分配到worker slot進程中(一個slot表明一個jvm虛擬機)。
因爲在邏輯分配階段,task就是按照topology進行了排序,即相同component所屬的task排列在一塊兒,而在物理分配階段slot資源也是按照端口進行了排序,即相同端口的slot排在了一塊兒,
而具體分配算法是將排好序的task一次輪序分配到排好序的slot中,所以同一個component所屬的不一樣task會盡量的分到不一樣機器的相同端口上的slot中,實現了整個Topology的負載均衡,這樣分配的好處是防止同一個component的全部task都分配到同一臺機器上,形成整個集羣負載不均。