storm java 編程思路

整體思路web

storm編程和hadoop的mapreduce的編程很相似,hadoop的mapreduce須要本身實現map函數,reduce函數,還有一個主類驅動;storm須要本身實現spout,bolt和一個主函數。storm編程爲如下三步:編程

建立一個Spout讀取數據
建立bolt處理數據
建立一個主類,在主類中建立拓撲和一個集羣對象,將拓撲提交到集羣分佈式

Topology運行方式
Topology的運行能夠分爲本地模式和分佈式模式,模式的設置能夠在配置文件中設定,也能夠在代碼中設置。本地模式其實什麼都不須要安裝,有storm jar包就夠了 函數

(1)本地運行的提交方式:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, conf, topology);
cluster.killTopology(topologyName);
cluster.shutdown();
 
(2)分佈式提交方式:
StormSubmitter.submitTopology(topologyName, topologyConfig, builder.createTopology());
須要注意的是,在Storm代碼編寫完成以後,須要打包成jar包放到Nimbus中運行,打包的時候,不須要把依賴的jar都打進去,不然若是把依賴的storm.jar包打進去的話,運行時會出現重複的配置文件錯誤致使Topology沒法運行。由於Topology運行以前,會加載本地的storm.yaml配置文件。
在Nimbus運行的命令以下:
storm jar StormTopology.jar maincalss args
Topology運行流程
  有幾點須要說明的地方:
(1)Storm提交後,會把代碼首先存放到Nimbus節點的inbox目錄下,以後,會把當前Storm運行的配置生成一個stormconf.ser文件放到Nimbus節點的stormdist目錄中,在此目錄中同時還有序列化以後的Topology代碼文件;
(2)在設定Topology所關聯的Spouts和Bolts時,能夠同時設置當前Spout和Bolt的executor數目和task數目,默認狀況下,一個Topology的task的總和是和executor的總和一致的。以後,系統根據worker的數目,儘可能平均的分配這些task的執行。worker在哪一個supervisor節點上運行是由storm自己決定的;
(3)任務分配好以後,Nimbes節點會將任務的信息提交到zookeeper集羣,同時在zookeeper集羣中會有workerbeats節點,這裏存儲了當前Topology的全部worker進程的心跳信息;
(4)Supervisor節點會不斷的輪詢zookeeper集羣,在zookeeper的assignments節點中保存了全部Topology的任務分配信息、代碼存儲目錄、任務之間的關聯關係等,Supervisor經過輪詢此節點的內容,來領取本身的任務,啓動worker進程運行;
(5)一個Topology運行以後,就會不斷的經過Spouts來發送Stream流,經過Bolts來不斷的處理接收到的Stream流,Stream流是無界的。
最後一步會不間斷的執行,除非手動結束Topology。
Topology方法調用流程
Topology中的Stream處理時的方法調用過程以下:oop

 有幾點須要說明的地方:
   (1)每一個組件(Spout或者Bolt)的構造方法和declareOutputFields方法都只被調用一次。
   (2)open方法、prepare方法的調用是屢次的。入口函數中設定的setSpout或者setBolt裏的並行度參數指的是executor的數目,是負責運行組件中的task的線程         的數目,此數目是多少,上述的兩個方法就會被調用多少次,在每一個executor運行的時候調用一次。至關於一個線程的構造方法。
   (3)nextTuple方法、execute方法是一直被運行的,nextTuple方法不斷的發射Tuple,Bolt的execute不斷的接收Tuple進行處理。只有這樣不斷地運行,纔會產生無界的Tuple流,體現實時性。至關於線程的run方法。
   (4)在提交了一個topology以後,Storm就會建立spout/bolt實例並進行序列化。以後,將序列化的component發送給全部的任務所在的機器(即Supervisor節點),在每個任務上反序列化component。
   (5)Spout和Bolt之間、Bolt和Bolt之間的通訊,是經過zeroMQ的消息隊列實現的。
   (6)上圖沒有列出ack方法和fail方法,在一個Tuple被成功處理以後,須要調用ack方法來標記成功,不然調用fail方法標記失敗,從新處理這個Tuple。
Topology並行度
    在Topology的執行單元裏,有幾個和並行度相關的概念。
(1)worker:每一個worker都屬於一個特定的Topology,每一個Supervisor節點的worker能夠有多個,每一個worker使用一個單獨的端口,它對Topology中的每一個component運行一個或者多個executor線程來提供task的運行服務。
(2)executor:executor是產生於worker進程內部的線程,會執行同一個component的一個或者多個task。
(3)task:實際的數據處理由task完成,在Topology的生命週期中,每一個組件的task數目是不會發生變化的,而executor的數目卻不必定。executor數目小於等於task的數目,默認狀況下,兩者是相等的。
    在運行一個Topology時,能夠根據具體的狀況來設置不一樣數量的worker、task、executor,而設置的位置也能夠在多個地方。
(1)worker設置:
(1.1)能夠經過設置yaml中的topology.workers屬性
(1.2)在代碼中經過Config的setNumWorkers方法設定
(2)executor設置:
    經過在Topology的入口類中setBolt、setSpout方法的最後一個參數指定,不指定的話,默認爲1;
(3)task設置:
    (3.1) 默認狀況下,和executor數目一致;
    (3.2)在代碼中經過TopologyBuilder的setNumTasks方法設定具體某個組件的task數目;
終止Topology
    經過在Nimbus節點利用以下命令來終止一個Topology的運行:
storm kill topologyName
    kill以後,能夠經過UI界面查看topology狀態,會首先變成KILLED狀態,在清理完本地目錄和zookeeper集羣中的和當前Topology相關的信息以後,此Topology就會完全消失了。
Topology跟蹤ui

    Topology提交後,能夠在Nimbus節點的web界面查看,默認的地址是http://NimbusIp:8080spa

相關文章
相關標籤/搜索