[TOC]html
Storm集羣表面相似Hadoop集羣。但在Hadoop上你運行的是」MapReduce jobs」,在Storm上你運行的是」topologies」。」Jobs」和」topologies」是大不一樣的,一個關鍵不一樣是一個MapReduce的Job最終會結束,而一個topology永遠處理消息(或直到你kill它)。java
Storm集羣有兩種節點:控制(master)節點和工做者(worker)節點。控制節點運行一個稱之爲」Nimbus」的後臺程序,它相似於Haddop的」JobTracker」。Nimbus負責在集羣範圍內分發代碼、爲worker分配任務和故障監測。apache
注意:Hadoop 2.0之前使用JobTrack來進行Job的分發,但2.x以後就使用了全新的資源調度框架,即yarn,這點尤爲須要注意。api
每一個工做者節點運行一個稱之」Supervisor」的後臺程序。Supervisor監聽分配給它所在機器的工做,基於Nimbus分配給它的事情來決定啓動或中止工做者進程。每一個工做者進程執行一個topology的子集(也就是一個子拓撲結構);一個運行中的topology由許多跨多個機器的工做者進程組成。bash
一個Zookeeper集羣負責Nimbus和多個Supervisor之間的全部協調工做(一個完整的拓撲可能被分爲多個子拓撲並由多個supervisor完成)。app
此外,Nimbus後臺程序和Supervisor後臺程序都是快速失敗(fail-fast)和無狀態的;全部狀態維持在Zookeeper或本地磁盤。這意味着你能夠kill -9殺掉nimbus進程和supervisor進程,而後重啓,它們將恢復狀態並繼續工做,就像什麼也沒發生。這種設計使storm極其穩定。這種設計中Master並無直接和worker通訊,而是藉助一箇中介Zookeeper,這樣一來能夠分離master和worker的依賴,將狀態信息存放在zookeeper集羣內以快速恢復任何失敗的一方。框架
能夠參考官方文檔:http://storm.apache.org/releases/1.0.6/Setting-up-a-Storm-cluster.htmlmaven
官方文檔對於配置中的解釋是很是清晰明瞭和容易理解的。分佈式
下載地址:https://storm.apache.org/downloads.html 須要確保已經安裝好了zookeeper環境,在個人環境中已經搭建好了zookeeper集羣環境。 1.解壓 [uplooking@uplooking01 soft]$ tar -zxvf apache-storm-1.0.2.tar.gz -C ../app/ [uplooking@uplooking01 app]$ mv apache-storm-1.0.2/ storm 2.修改配置文件 # storm-env.sh export JAVA_HOME=/opt/jdk export STORM_CONF_DIR="/home/uplooking/app/storm/conf" # storm.yaml storm.zookeeper.servers: - "uplooking01" - "uplooking02" - "uplooking03" nimbus.seeds: ["uplooking01", "uplooking02"] storm.local.dir: "/home/uplooking/data/storm" supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 3.建立storm.local.dir mkdir -p /home/uplooing/data/storm 4.配置環境變量 # .bash_profile export STORM_HOME=/home/uplooking/app/storm export PATH=$PATH:$STORM_HOME/bin # 將其同步到其它節點 scp .bash_profile uplooking@uplooking02:/home/uplooking scp .bash_profile uplooking@uplooking03:/home/uplooking 5.複製storm安裝目錄到其它節點 scp -r storm/ uplooking@uplooking02:/home/uplooking/app scp -r storm/ uplooking@uplooking03:/home/uplooking/app 6.啓動storm集羣 # uplooking01 storm nimbus & storm ui & # uplooking02 storm nimbus & storm supervisor & # uplooking03 storm supervisor & 7.啓動logviewer(可選) 在全部從節點執行"nohup bin/storm logviewer >/dev/null 2>&1 &"啓動log後臺程序,並放到後臺執行。 (nimbus節點能夠不用啓動logviewer進程,由於logviewer進程主要是爲了方便查看任務的執行日誌,這些執行日誌都在supervisor節點上)。
由於啓動了storm ui,在地址欄中輸入:http://uplooking01:8080就能夠查看storm集羣的相關信息:ide
同時查看其顯示的信息,對於咱們前面的配置也有一個十分直觀的體現。
使用前面的計算總和的例子:
package cn.xpleaf.bigdata.storm.remote; import cn.xpleaf.bigdata.storm.utils.StormUtil; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Date; import java.util.Map; /** * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。 * <p> * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯 * MapReduce的組件:Mapper和Reducer、數據是Writable,經過一個main中的job將兩者關聯 * <p> * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些不必的方法進行了重寫,但其重寫的代碼沒有實現任何功能。 * 咱們稱這爲適配器模式 */ public class StormSumTopology { /** * 數據源 */ static class OrderSpout extends BaseRichSpout { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private SpoutOutputCollector collector; // 發送tuple的組件 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } /** * 接收數據的核心方法 */ @Override public void nextTuple() { long num = 0; while (true) { num++; StormUtil.sleep(1000); System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產生的訂單金額:" + num); this.collector.emit(new Values(num)); } } /** * 是對發送出去的數據的描述schema */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("order_cost")); } } /** * 計算和的Bolt節點 */ static class SumBolt extends BaseRichBolt { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private OutputCollector collector; // 發送tuple的組件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private Long sumOrderCost = 0L; /** * 處理數據的核心方法 */ @Override public void execute(Tuple input) { Long orderCost = input.getLongByField("order_cost"); sumOrderCost += orderCost; System.out.println("商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost); StormUtil.sleep(1000); } /** * 若是當前bolt爲最後一個處理單元,該方法能夠不用管 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } /** * 構建拓撲,至關於在MapReduce中構建Job */ public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); /** * 設置spout和bolt的dag(有向無環圖) */ builder.setSpout("id_order_spout", new OrderSpout()); builder.setBolt("id_sum_bolt", new SumBolt()) .shuffleGrouping("id_order_spout"); // 經過不一樣的數據流轉方式,來指定數據的上游組件 // 使用builder構建topology StormTopology topology = builder.createTopology(); String topologyName = StormSumTopology.class.getSimpleName(); // 拓撲的名稱 Config config = new Config(); // Config()對象繼承自HashMap,但自己封裝了一些基本的配置 // 啓動topology,本地啓動使用LocalCluster,集羣啓動使用StormSubmitter if (args == null || args.length < 1) { // 沒有參數時使用本地模式,有參數時使用集羣模式 LocalCluster localCluster = new LocalCluster(); // 本地開發模式,建立的對象爲LocalCluster localCluster.submitTopology(topologyName, config, topology); } else { StormSubmitter.submitTopology(topologyName, config, topology); } } }
能夠看到區別在於後面做業的提供方式,使用集羣的方式爲:StormSubmitter.submitTopology(topologyName, config, topology);
。
這裏使用Maven的方式進行打包,確保pom.xml中已經配置了storm-core
依賴的可見範圍和相關的打包插件:
<!--依賴--> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.2</version> <!--可見範圍爲provided時,打包時不會對依賴進行打包,但在本地測試開發時應該註釋掉,不然程序沒法運行--> <!--另外不須要打包storm的依賴是由於,集羣中已經有storm的相關依賴jar包了--> <scope>provided</scope> </dependency> <!--打包插件--> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <!-- 將依賴也一塊兒打包 --> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <!-- 能夠在這裏指定運行的主類,這樣在打包爲jar包後就能夠不用指定須要運行的類 --> <mainClass> </mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
在idea中配置maven打包的命令:
clean package -DskipTests
以後就能夠打包並上傳到咱們的集羣環境中了。
[uplooking@uplooking01 storm]$ cn.xpleaf.bigdata.storm.remote.StormSumTopology cluster -bash: cn.xpleaf.bigdata.storm.remote.StormSumTopology: command not found [uplooking@uplooking01 storm]$ storm jar storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar cn.xpleaf.bigdata.storm.remote.StormSumTopology cluster Running: /opt/jdk/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/home/uplooking/app/storm -Dstorm.log.dir=/home/uplooking/app/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/uplooking/app/storm/lib/log4j-over-slf4j-1.6.6.jar:/home/uplooking/app/storm/lib/reflectasm-1.10.1.jar:/home/uplooking/app/storm/lib/disruptor-3.3.2.jar:/home/uplooking/app/storm/lib/clojure-1.7.0.jar:/home/uplooking/app/storm/lib/objenesis-2.1.jar:/home/uplooking/app/storm/lib/log4j-slf4j-impl-2.1.jar:/home/uplooking/app/storm/lib/slf4j-api-1.7.7.jar:/home/uplooking/app/storm/lib/log4j-core-2.1.jar:/home/uplooking/app/storm/lib/storm-core-1.0.2.jar:/home/uplooking/app/storm/lib/storm-rename-hack-1.0.2.jar:/home/uplooking/app/storm/lib/kryo-3.0.3.jar:/home/uplooking/app/storm/lib/asm-5.0.3.jar:/home/uplooking/app/storm/lib/log4j-api-2.1.jar:/home/uplooking/app/storm/lib/servlet-api-2.5.jar:/home/uplooking/app/storm/lib/minlog-1.3.0.jar:storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar:/home/uplooking/app/storm/conf:/home/uplooking/app/storm/bin -Dstorm.jar=storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar cn.xpleaf.bigdata.storm.remote.StormSumTopology cluster 842 [main] INFO o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -8973061592627522790:-5130577098800003128 934 [main] INFO o.a.s.s.a.AuthUtils - Got AutoCreds [] 1036 [main] INFO o.a.s.StormSubmitter - Uploading topology jar storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar to assigned location: /home/uplooking/data/storm/nimbus/inbox/stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar 1064 [main] INFO o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /home/uplooking/data/storm/nimbus/inbox/stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar 1064 [main] INFO o.a.s.StormSubmitter - Submitting topology StormSumTopology in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-8973061592627522790:-5130577098800003128"} 1710 [main] INFO o.a.s.StormSubmitter - Finished submitting topology: StormSumTopology
注意看輸出,jar包被上傳到/home/uplooking/data/storm/nimbus/inbox/stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar
,後面能夠在leader
節點中查看到有該jar包:
[uplooking@uplooking02 inbox]$ pwd /home/uplooking/data/storm/nimbus/inbox [uplooking@uplooking02 inbox]$ ls stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar
由於此時uplooking01
節點不是leader
,因此在其上面是沒有該jar包的,這點須要注意。
能夠在storm ui中查看此時的集羣狀態信息:
再查看詳細的Topology信息:
再查看spout或者bolt的詳細信息:
能夠看到是在uplooking02
上運行的Executors
,此時能夠到該節點上查看輸出信息:
[uplooking@uplooking02 6700]$ pwd /home/uplooking/app/storm/logs/workers-artifacts/StormSumTopology-1-1523548000/6700 [uplooking@uplooking02 6700]$ tail -5 worker.log 2018-04-13 00:39:56.636 STDIO [INFO] 商城網站到目前20180413003956的商品總交易額5054610 2018-04-13 00:39:57.636 STDIO [INFO] 當前時間20180413003957產生的訂單金額:3181 2018-04-13 00:39:57.637 STDIO [INFO] 商城網站到目前20180413003957的商品總交易額5057790 2018-04-13 00:39:58.638 STDIO [INFO] 當前時間20180413003958產生的訂單金額:3182 2018-04-13 00:39:58.639 STDIO [INFO] 商城網站到目前20180413003958的商品總交易額5060971
須要注意的是,此時在uplooking03
上是沒有這些信息的,由於集羣將做業交給了uplooking02
上的supervisor
來運行。此外還須要知道的是,在uplooking02
的data目錄下也能夠查看到有前面的jar包,其是由nimbus
分發過來的:
[uplooking@uplooking02 StormSumTopology-1-1523548000]$ pwd /home/uplooking/data/storm/supervisor/stormdist/StormSumTopology-1-1523548000 [uplooking@uplooking02 StormSumTopology-1-1523548000]$ ls stormcode.ser stormconf.ser stormjar.jar
可是在uplooking03
上也是沒有的。
另外也能夠在uplooking02
上使用jps命令查看到有worker
進程:
[uplooking@uplooking02 ~]$ jps 2224 QuorumPeerMain 1858 Elasticsearch 27427 logviewer 2291 NameNode 27972 LogWriter 27988 worker 25878 nimbus 28006 Jps 26054 supervisor 2552 DFSZKFailoverController 2365 DataNode 2462 JournalNode
對於輸出信息的查看,其實也能夠在storm ui上直接進行查看,上面的界面,點擊6700
的連接,就能夠進行查看,可是前提是須要先在uplooking02
上運行了logviewer
:
storm logviewer &
查看到的輸出以下:
由前面能夠知道,目前worker
運行在uplooking02
上,若是在此節點上直接將該進程kill掉,那麼其又會自動進行重啓:
[uplooking@uplooking02 ~]$ jps | grep worker 27988 worker [uplooking@uplooking02 ~]$ kill -9 27988 [uplooking@uplooking02 ~]$ jps | grep worker kill 27988: 沒有那個進程 [uplooking@uplooking02 ~]$ kill 27988: 沒有那個進程 [uplooking@uplooking02 ~]$ jps | grep worker 28235 worker
固然若是真的但願停掉Topology做業,有兩種方式:
第一種是在storm ui的topology界面中進行操做: Topology actions中有Kill的操做,點擊便可 第二種是在命令行中使用命令進行操做: [uplooking@uplooking01 ~]$ storm kill Syntax: [storm kill topology-name [-w wait-time-secs]] -w後接秒數,表示多少秒後將中止該topology做業
再作進一步的驗證,若是把三臺主機上除了了worker
進程(nimbus、supervisor等)都關掉,那麼此時worker
是能夠繼續正常運行的,數據也會正常產生,只是此時不一樣的是,不可以再向storm集羣中添加做業了。