和一樣是計算框架的Mapreduce相比,Mapreduce集羣上運行的是Job,而Storm集羣上運行的是Topology。可是Job在運行結束以後會自行結束,Topology卻只能被手動的kill掉,不然會一直運行下去。html
Storm集羣中有兩種節點,一種是控制節點(Nimbus節點),另外一種是工做節點(Supervisor節點)。全部Topology任務的提交必須在Storm客戶端節點上進行(須要配置~/.storm/storm.yaml文件),由Nimbus節點分配給其餘Supervisor節點進行處理。Nimbus節點首先將提交的Topology進行分片,分紅一個個的Task,並將Task和Supervisor相關的信息提交到zookeeper集羣上,Supervisor會去zookeeper集羣上認領本身的Task,通知本身的Worker進程進行Task的處理。整體的Topology處理流程圖爲:git
每一個Topology都由Spout和Bolt組成,在Spout和Bolt傳遞信息的基本單位叫作Tuple,由Spout發出的接二連三的Tuple及其在相應Bolt上處理的子Tuple連起來稱爲一個Steam,每一個Stream的命名是在其首個Tuple被Spout發出的時候,此時Storm會利用內部的Ackor機制保證每一個Tuple可靠的被處理。github
而Tuple能夠理解成鍵值對,其中,鍵就是在定義在declareStream方法中的Fields字段,而值就是在emit方法中發送的Values字段。web
在運行Topology以前,能夠經過一些參數的配置來調節運行時的狀態,參數的配置是經過Storm框架部署目錄下的conf/storm.yaml文件來完成的。在次文件中能夠配置運行時的Storm本地目錄路徑、運行時Worker的數目等。數據庫
在代碼中,也能夠設置Config的一些參數,可是優先級是不一樣的,不一樣位置配置Config參數的優先級順序爲:數組
default.yaml<storm.yaml<topology內部的configuration<內部組件的special configuration<外部組件的special configuration服務器
在storm.yaml中經常使用的幾個選項爲:併發
配置選項名稱app |
配置選項做用框架 |
topology.max.task.parallelism |
每一個Topology運行時最大的executor數目 |
topology.workers |
每一個Topology運行時的worker的默認數目,若在代碼中設置,則此選項值被覆蓋 |
storm.zookeeper.servers |
zookeeper集羣的節點列表 |
storm.local.dir |
Storm用於存儲jar包和臨時文件的本地存儲目錄 |
storm.zookeeper.root |
Storm在zookeeper集羣中的根目錄,默認是「/」 |
ui.port |
Storm集羣的UI地址端口號,默認是8080 |
nimbus.host: |
Nimbus節點的host |
supervisor.slots.ports |
Supervisor節點的worker佔位槽,集羣中的全部Topology公用這些槽位數,即便提交時設置了較大數值的槽位數,系統也會按照當前集羣中實際剩餘的槽位數來進行分配,當全部的槽位數都分配完時,新提交的Topology只能等待,系統會一直監測是否有空餘的槽位空出來,若是有,就再次給新提交的Topology分配 |
supervisor.worker.timeout.secs |
Worker的超時時間,單位爲秒,超時後,Storm認爲當前worker進程死掉,會從新分配其運行着的task任務 |
drpc.servers |
在使用drpc服務時,drpc server的服務器列表 |
drpc.port |
在使用drpc服務時,drpc server的服務端口 |
Spout是Stream的消息產生源, Spout組件的實現能夠經過繼承BaseRichSpout類或者其餘*Spout類來完成,也能夠經過實現IRichSpout接口來實現。
須要根據狀況實現Spout類中重要的幾個方法有:
當一個Task被初始化的時候會調用此open方法。通常都會在此方法中對發送Tuple的對象SpoutOutputCollector和配置對象TopologyContext初始化。
示例以下:
1 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 2 3 _collector = collector; 4 5 }
此方法用於聲明當前Spout的Tuple發送流。Stream流的定義是經過OutputFieldsDeclare.declareStream方法完成的,其中的參數包括了發送的域Fields。
示例以下:
1 public void declareOutputFields(OutputFieldsDeclarer declarer) { 2 3 declarer.declare(new Fields("word")); 4 5 }
此方法用於聲明針對當前組件的特殊的Configuration配置。
示例以下:
1 public Map<String, Object> getComponentConfiguration() { 2 3 if(!_isDistributed) { 4 5 Map<String, Object> ret = new HashMap<String, Object>(); 6 7 ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 3); 8 9 return ret; 10 11 } else { 12 13 return null; 14 15 } 16 17 }
這裏即是設置了Topology中當前Component的線程數量上限。
這是Spout類中最重要的一個方法。發射一個Tuple到Topology都是經過這個方法來實現的。
示例以下:
1 public void nextTuple() { 2 3 Utils.sleep(100); 4 5 final String[] words = new String[] {"twitter","facebook","google"}; 6 7 final Random rand = new Random(); 8 9 final String word = words[rand.nextInt(words.length)]; 10 11 _collector.emit(new Values(word)); 12 13 }
這裏即是從一個數組中隨機選取一個單詞做爲Tuple,而後經過_collector發送到Topology。
另外,除了上述幾個方法以外,還有ack、fail和close方法等。Storm在監測到一個Tuple被成功處理以後會調用ack方法,處理失敗會調用fail方法,這兩個方法在BaseRichSpout類中已經被隱式的實現了。
Bolt類接收由Spout或者其餘上游Bolt類發來的Tuple,對其進行處理。Bolt組件的實現能夠經過繼承BasicRichBolt類或者IRichBolt接口來完成。
Bolt類須要實現的主要方法有:
此方法和Spout中的open方法相似,爲Bolt提供了OutputCollector,用來從Bolt中發送Tuple。Bolt中Tuple的發送能夠在prepare方法中、execute方法中、cleanup等方法中進行,通常都是些在execute中。
示例以下:
1 public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 2 3 _collector = collector; 4 5 }
用於聲明當前Bolt發送的Tuple中包含的字段,和Spout中相似。
示例以下:
1 public void declareOutputFields(OutputFieldsDeclarer declarer) { 2 3 declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds")); 4 5 }
此例說明當前Bolt類發送的Tuple包含了三個字段:"obj", "count", "actualWindowLengthInSeconds"。
和Spout類同樣,在Bolt中也能夠有getComponentConfiguration方法。
示例以下:
1 public Map<String, Object> getComponentConfiguration() { 2 3 Map<String, Object> conf = new HashMap<String, Object>(); 4 5 conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); 6 7 return conf; 8 9 }
此例定義了從系統組件「_system」的「_tick」流中發送Tuple到當前Bolt的頻率,當系統須要每隔一段時間執行特定的處理時,就能夠利用這個系統的組件的特性來完成。
這是Bolt中最關鍵的一個方法,對於Tuple的處理均可以放到此方法中進行。具體的發送也是經過emit方法來完成的。此時,有兩種狀況,一種是emit方法中有兩個參數,另外一個種是有一個參數。
(1)emit有一個參數:此惟一的參數是發送到下游Bolt的Tuple,此時,由上游發來的舊的Tuple在此隔斷,新的Tuple和舊的Tuple再也不屬於同一棵Tuple樹。新的Tuple另起一個新的Tuple樹。
(2)emit有兩個參數:第一個參數是舊的Tuple的輸入流,第二個參數是發往下游Bolt的新的Tuple流。此時,新的Tuple和舊的Tuple是仍然屬於同一棵Tuple樹,即,若是下游的Bolt處理Tuple失敗,則會向上傳遞到當前Bolt,當前Bolt根據舊的Tuple流繼續往上游傳遞,申請重發失敗的Tuple。保證Tuple處理的可靠性。
這兩種狀況要根據本身的場景來肯定。
示例以下:
1 public void execute(Tuple tuple) { 2 3 _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); 4 5 _collector.ack(tuple); 6 7 } 8 9 public void execute(Tuple tuple) { 10 11 _collector.emit(new Values(tuple.getString(0) + "!!!")); 12 13 }
此外還有ack方法、fail方法、cleanup方法等。其中cleanup方法和Spout中的close方法相似,都是在當前Component關閉時調用,可是針對實時計算來講,除非一些特殊的場景要求之外,這兩個方法通常都不多用到。
上文中介紹了Topology的基本組件Spout和Bolt,在Topology中,數據流Tuple的處理就是不斷的經過調用不一樣的Spout和Bolt來完成的。不一樣的Bolt和Spout的上下游關係是經過在入口類中定義的。示例以下:
1 builder = new TopologyBuilder(); 2 3 builder.setSpout(spoutId, new TestWordSpout(), 5); 4 5 builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word")); 6 7 builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields("obj")); 8 builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId); 9
此例中的builder是TopologyBuilder對象,經過它的createTopology方法能夠建立一個Topology對象,同時此builder還要定義當前Topology中用到的Spout和Bolt對象,分別經過setSpout方法和setBolt方法來完成。
setSpout方法和setBolt方法中的第一個參數是當前的Component組件的Stream流ID號;第二個參數是具體的Component實現類的構造;第三個參數是當前Component的並行執行的線程數目,Storm會根據這個數字的累加和來肯定Topology的Task數目。最後的小尾巴*Grouping是指的一個Stream應如何分配數據給Bolt上面的Task。目前Storm的Stream Grouping有以下幾種:
(1)ShuffleGrouping:隨機分組,隨機分發Stream中的tuple,保證每一個Bolt的Task接收Tuple數量大體一致;
(2)FieldsGrouping:按照字段分組,保證相同字段的Tuple分配到同一個Task中;
(3)AllGrouping:廣播發送,每個Task都會受到全部的Tuple;
(4)GlobalGrouping:全局分組,全部的Tuple都發送到同一個Task中,此時通常將當前Component的併發數目設置爲1;
(5)NonGrouping:不分組,和ShuffleGrouping相似,當前Task的執行會和它的被訂閱者在同一個線程中執行;
(6)DirectGrouping:直接分組,直接指定由某個Task來執行Tuple的處理,並且,此時必須有emitDirect方法來發送;
(7) localOrShuffleGrouping:和ShuffleGrouping相似,若Bolt有多個Task在同一個進程中,Tuple會隨機發給這些Task。
不一樣的的Grouping,須要根據不一樣的場景來具體設定,不一而論。
Topology的運行能夠分爲本地模式和分佈式模式,模式的設置能夠在配置文件中設定,也能夠在代碼中設置。
(1)本地運行的提交方式:
1 LocalCluster cluster = new LocalCluster(); 2 3 cluster.submitTopology(topologyName, conf, topology); 4 5 cluster.killTopology(topologyName); 6 7 cluster.shutdown();
(2)分佈式提交方式:
StormSubmitter.submitTopology(topologyName, topologyConfig, builder.createTopology());
須要注意的是,在Storm代碼編寫完成以後,須要打包成jar包放到Nimbus中運行,打包的時候,不須要把依賴的jar都打進去,不然若是把依賴的storm.jar包打進去的話,運行時會出現重複的配置文件錯誤致使Topology沒法運行。由於Topology運行以前,會加載本地的storm.yaml配置文件。
在Nimbus運行的命令以下:
storm jar StormTopology.jar maincalss args
有幾點須要說明的地方:
(1)Storm提交後,會把代碼首先存放到Nimbus節點的inbox目錄下,以後,會把當前Storm運行的配置生成一個stormconf.ser文件放到Nimbus節點的stormdist目錄中,在此目錄中同時還有序列化以後的Topology代碼文件;
(2)在設定Topology所關聯的Spouts和Bolts時,能夠同時設置當前Spout和Bolt的executor數目和task數目,默認狀況下,一個Topology的task的總和是和executor的總和一致的。以後,系統根據worker的數目,儘可能平均的分配這些task的執行。worker在哪一個supervisor節點上運行是由storm自己決定的;
(3)任務分配好以後,Nimbes節點會將任務的信息提交到zookeeper集羣,同時在zookeeper集羣中會有workerbeats節點,這裏存儲了當前Topology的全部worker進程的心跳信息;
(4)Supervisor節點會不斷的輪詢zookeeper集羣,在zookeeper的assignments節點中保存了全部Topology的任務分配信息、代碼存儲目錄、任務之間的關聯關係等,Supervisor經過輪詢此節點的內容,來領取本身的任務,啓動worker進程運行;
(5)一個Topology運行以後,就會不斷的經過Spouts來發送Stream流,經過Bolts來不斷的處理接收到的Stream流,Stream流是無界的。
最後一步會不間斷的執行,除非手動結束Topology。
Topology中的Stream處理時的方法調用過程以下:
有幾點須要說明的地方:
(1)每一個組件(Spout或者Bolt)的構造方法和declareOutputFields方法都只被調用一次。
(2)open方法、prepare方法的調用是屢次的。入口函數中設定的setSpout或者setBolt裏的並行度參數指的是executor的數目,是負責運行組件中的task的線程 的數目,此數目是多少,上述的兩個方法就會被調用多少次,在每一個executor運行的時候調用一次。至關於一個線程的構造方法。
(3)nextTuple方法、execute方法是一直被運行的,nextTuple方法不斷的發射Tuple,Bolt的execute不斷的接收Tuple進行處理。只有這樣不斷地運行,纔會產 生無界的Tuple流,體現實時性。至關於線程的run方法。
(4)在提交了一個topology以後,Storm就會建立spout/bolt實例並進行序列化。以後,將序列化的component發送給全部的任務所在的機器(即Supervisor節 點),在每個任務上反序列化component。
(5)Spout和Bolt之間、Bolt和Bolt之間的通訊,是經過zeroMQ的消息隊列實現的。
(6)上圖沒有列出ack方法和fail方法,在一個Tuple被成功處理以後,須要調用ack方法來標記成功,不然調用fail方法標記失敗,從新處理這個Tuple。
在Topology的執行單元裏,有幾個和並行度相關的概念。
(1)worker:每一個worker都屬於一個特定的Topology,每一個Supervisor節點的worker能夠有多個,每一個worker使用一個單獨的端口,它對Topology中的每一個component運行一個或者多個executor線程來提供task的運行服務。
(2)executor:executor是產生於worker進程內部的線程,會執行同一個component的一個或者多個task。
(3)task:實際的數據處理由task完成,在Topology的生命週期中,每一個組件的task數目是不會發生變化的,而executor的數目卻不必定。executor數目小於等於task的數目,默認狀況下,兩者是相等的。
在運行一個Topology時,能夠根據具體的狀況來設置不一樣數量的worker、task、executor,而設置的位置也能夠在多個地方。
(1)worker設置:
(1.1)能夠經過設置yaml中的topology.workers屬性
(1.2)在代碼中經過Config的setNumWorkers方法設定
(2)executor設置:
經過在Topology的入口類中setBolt、setSpout方法的最後一個參數指定,不指定的話,默認爲1;
(3)task設置:
(3.1) 默認狀況下,和executor數目一致;
(3.2)在代碼中經過TopologyBuilder的setNumTasks方法設定具體某個組件的task數目;
經過在Nimbus節點利用以下命令來終止一個Topology的運行:
storm kill topologyName
kill以後,能夠經過UI界面查看topology狀態,會首先變成KILLED狀態,在清理完本地目錄和zookeeper集羣中的和當前Topology相關的信息以後,此Topology就會完全消失了。
Topology提交後,能夠在Nimbus節點的web界面查看,默認的地址是http://NimbusIp:8080。
上面給出瞭如何編寫Storm框架任務Topology的方法,那麼在哪些場景下可以使用Storm框架呢?下面介紹Storm框架的幾個典型的應用場景。
(1)利用Storm框架的DRPC進行大量的函數並行調用,即實現分佈式的RPC;
(2)利用Storm框架的Transaction Topology,能夠進行實時性的批量更新或者查詢數據庫操做或者應用須要同一批內的消息以及批與批之間的消息並行處理這樣的場景,此時Topology中只能有一個TrasactionalSpout;
(3)利用滑動窗口的邏輯結合Storm框架來計算得出某段時間內的售出量最多的產品、購買者最多的TopN地區等;
(4)精確的廣告推送,在用戶瀏覽產品的時候,將瀏覽記錄實時性的蒐集,發送到Bolt,由Bolt來根據用戶的帳戶信息(若是有的話)完成產品的分類統計,產品的相關性查詢等邏輯計算以後,將計算結果推送給用戶;
(5)實時日誌的處理,Storm能夠和一個分佈式存儲結合起來,實時性的從多個數據源發送數據處處理邏輯Bolts,Bolts完成一些邏輯處理以後,交給分佈式存儲框架進行存儲,此時,Spout能夠是多個;
(6)實時性的監控輿論熱點,好比針對某個關鍵詞,在用戶查詢的時候,產生數據源Spout,結合語義分析等,由Bolt來完成查詢關鍵詞的統計分析,彙總當前的輿論熱點;
(7)數據流的實時聚合操做。
http://xumingming.sinaapp.com/138/twitter-storm%E5%85%A5%E9%97%A8/
https://github.com/nathanmarz/storm/wiki
http://nathanmarz.github.io/storm/doc/index-all.html
有理解不到位的地方,歡迎批評指正,一塊兒交流~