Storm是Twitter開源的分佈式實時大數據處理框架,最先開源於github,從0.9.1版本以後,歸於Apache社區,被業界稱爲實時版Hadoop。隨着愈來愈多的場景對Hadoop的MapReduce高延遲沒法容忍,好比網站統計、推薦系統、預警系統、金融系統(高頻交易、股票)等等,大數據實時處理解決方案(流計算)的應用日趨普遍,目前已經是分佈式技術領域最新爆發點,而Storm更是流計算技術中的佼佼者和主流。java
Storm程序再Storm集羣中運行的示例圖以下:git
爲何把Topology單獨提出來呢,由於Topology是咱們開發程序主要的用的組件。
Topology和MapReduce很相像。
MapReduce是Map進行獲取數據,Reduce進行處理數據。
而Topology則是使用Spout獲取數據,Bolt來進行計算。
總的來講就是一個Topology由一個或者多個的Spout和Bolt組成。github
具體流程是怎麼走,能夠經過查看下面這張圖來進行了解。
示例圖:數據庫
注:圖片來源http://www.tianshouzhi.com/api/tutorials/storm/52。apache
圖片有三種模式,解釋以下:
第一種比較簡單,就是由一個Spout獲取數據,而後交給一個Bolt進行處理;
第二種稍微複雜點,由一個Spout獲取數據,而後交給一個Bolt進行處理一部分,而後在交給下一個Bolt進行處理其餘部分。
第三種則比較複雜,一個Spout能夠同時發送數據到多個Bolt,而一個Bolt也能夠接受多個Spout或多個Bolt,最終造成多個數據流。可是這種數據流必須是有方向的,有起點和終點,否則會形成死循環,數據永遠也處理不完。就是Spout發給Bolt1,Bolt1發給Bolt2,Bolt2又發給了Bolt1,最終造成了一個環狀。api
以前已經寫過了,這裏就不在說明了。
博客地址:http://www.panchengming.com/2018/01/26/pancm70/數組
前面講了一些Storm概念,可能在理解上不太清楚,那麼這裏咱們就用一個Hello World代碼示例來體驗下Storm運做的流程吧。框架
在進行代碼開發以前,首先得作好相關的準備。
本項目是使用Maven構建的,使用Storm的版本爲1.1.1。
Maven的相關依賴以下:maven
<!--storm相關jar --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.1</version> <scope>provided</scope> </dependency>
在寫代碼的時候,咱們先來明確要用Storm作什麼。
那麼第一個程序,就簡單的輸出下信息。
具體步驟以下:分佈式
那麼首先開始編寫Spout類。通常是實現 IRichSpout 或繼承BaseRichSpout該類,而後實現該方法。
這裏咱們繼承BaseRichSpout這個類,該類須要實現這幾個主要的方法:
open()方法中是在ISpout接口中定義,在Spout組件初始化時被調用。
有三個參數,它們的做用分別是:
代碼示例:
@Override public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) { System.out.println("open:"+map.get("test")); this.collector = collector; }
nextTuple()方法是Spout實現的核心。
也就是主要執行方法,用於輸出信息,經過collector.emit
方法發射。
這裏咱們的數據信息已經寫死了,因此這裏咱們就直接將數據進行發送。
這裏設置只發送兩次。
代碼示例:
@Override public void nextTuple() { if(count<=2){ System.out.println("第"+count+"次開始發送數據..."); this.collector.emit(new Values(message)); } count++; }
declareOutputFields是在IComponent接口中定義,用於聲明數據格式。
即輸出的一個Tuple中,包含幾個字段。
由於這裏咱們只發射一個,因此就指定一個。若是是多個,則用逗號隔開。
代碼示例:
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { System.out.println("定義格式..."); declarer.declare(new Fields(field)); }
ack是在ISpout接口中定義,用於表示Tuple處理成功。
代碼示例:
@Override public void ack(Object obj) { System.out.println("ack:"+obj); }
fail是在ISpout接口中定義,用於表示Tuple處理失敗。
代碼示例:
@Override public void fail(Object obj) { System.out.println("失敗:"+obj); }
close是在ISpout接口中定義,用於表示Topology中止。
代碼示例:
@Override public void close() { System.out.println("關閉..."); }
至於還有其餘的,這裏就不在一一列舉了。
Bolt是用於處理數據的組件,主要是由execute方法來進行實現。通常來講須要實現 IRichBolt 或繼承BaseRichBolt該類,而後實現其方法。
須要實現方法以下:
在Bolt啓動前執行,提供Bolt啓動環境配置的入口。
參數基本和Sqout同樣。
通常對於不可序列化的對象進行實例化。
這裏的咱們就簡單的打印下
@Override public void prepare(Map map, TopologyContext arg1, OutputCollector collector) { System.out.println("prepare:"+map.get("test")); this.collector=collector; }
注:若是是能夠序列化的對象,那麼最好是使用構造函數。
execute()方法是Bolt實現的核心。
也就是執行方法,每次Bolt從流接收一個訂閱的tuple,都會調用這個方法。
從tuple中獲取消息可使用 tuple.getString()
和tuple.getStringByField();
這兩個方法。我的推薦第二種,能夠經過field來指定接收的消息。
注:若是繼承的是IRichBolt,則須要手動ack。這裏就不用了,BaseRichBolt會自動幫咱們應答。
代碼示例:
@Override public void execute(Tuple tuple) { // String msg=tuple.getString(0); String msg=tuple.getStringByField("test"); //這裏咱們就不作消息的處理,只打印 System.out.println("Bolt第"+count+"接受的消息:"+msg); count++; /** * * 沒次調用處理一個輸入的tuple,全部的tuple都必須在必定時間內應答。 * 能夠是ack或者fail。不然,spout就會重發tuple。 */ // collector.ack(tuple); }
和Spout的同樣。
由於到了這裏就再也不輸出了,因此就什麼都沒寫。
@Override public void declareOutputFields(OutputFieldsDeclarer arg0) { }
cleanup是IBolt接口中定義,用於釋放bolt佔用的資源。
Storm在終止一個bolt以前會調用這個方法。
由於這裏沒有什麼資源須要釋放,因此就簡單的打印一句就好了。
@Override public void cleanup() { System.out.println("資源釋放"); }
這裏咱們就是用main方法進行提交topology。
不過在提交topology以前,須要進行相應的設置。
這裏我就不一一細說了,代碼的註釋已經很詳細了。
代碼示例:
import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; /** * * Title: App * Description: * storm測試 * Version:1.0.0 * @author pancm * @date 2018年3月6日 */ public class App { private static final String str1="test1"; private static final String str2="test2"; public static void main(String[] args) { // TODO Auto-generated method stub //定義一個拓撲 TopologyBuilder builder=new TopologyBuilder(); //設置一個Executeor(線程),默認一個 builder.setSpout(str1, new TestSpout()); //設置一個Executeor(線程),和一個task builder.setBolt(str2, new TestBolt(),1).setNumTasks(1).shuffleGrouping(str1); Config conf = new Config(); conf.put("test", "test"); try{ //運行拓撲 if(args !=null&&args.length>0){ //有參數時,表示向集羣提交做業,並把第一個參數當作topology名稱 System.out.println("遠程模式"); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else{//沒有參數時,本地提交 //啓動本地模式 System.out.println("本地模式"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("111" ,conf, builder.createTopology() ); Thread.sleep(10000); // 關閉本地集羣 cluster.shutdown(); } }catch (Exception e){ e.printStackTrace(); } } }
運行該方法,輸出結果以下:
本地模式 定義格式... open:test 第1次開始發送數據... 第2次開始發送數據... prepare:test Bolt第1接受的消息:這是個測試消息! Bolt第2接受的消息:這是個測試消息! 資源釋放 關閉...
到這裏,是否是基本上對Storm的運做有些瞭解了呢。
這個demo達到了上述的三種模式圖中的第一種,一個Spout傳輸數據, 一個Bolt處理數據。
那麼若是咱們想達到第二種模式呢,那又該如何作呢?
假如咱們想統計下在一段文本中的單詞出現頻率的話,咱們只需執行一下步驟就能夠了。
1.首先將Spout中的message消息進行更改成數組,並依次將消息發送到TestBolt。
2.而後TestBolt將獲取的數據進行分割,將分割的數據發送到TestBolt2。
3.TestBolt2對數據進行統計,在程序關閉的時候進行打印。
4.Topology成功配置而且啓動以後,等待20秒左右,關閉程序,而後獲得輸出的結果。
代碼示例以下:
Spout
用於發送消息。
import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; /** * * Title: TestSpout * Description: * 發送信息 * Version:1.0.0 * @author pancm * @date 2018年3月6日 */ public class TestSpout extends BaseRichSpout{ private static final long serialVersionUID = 225243592780939490L; private SpoutOutputCollector collector; private static final String field="word"; private int count=1; private String[] message = { "My nickname is xuwujing", "My blog address is http://www.panchengming.com/", "My interest is playing games" }; /** * open()方法中是在ISpout接口中定義,在Spout組件初始化時被調用。 * 有三個參數: * 1.Storm配置的Map; * 2.topology中組件的信息; * 3.發射tuple的方法; */ @Override public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) { System.out.println("open:"+map.get("test")); this.collector = collector; } /** * nextTuple()方法是Spout實現的核心。 * 也就是主要執行方法,用於輸出信息,經過collector.emit方法發射。 */ @Override public void nextTuple() { if(count<=message.length){ System.out.println("第"+count +"次開始發送數據..."); this.collector.emit(new Values(message[count-1])); } count++; } /** * declareOutputFields是在IComponent接口中定義,用於聲明數據格式。 * 即輸出的一個Tuple中,包含幾個字段。 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { System.out.println("定義格式..."); declarer.declare(new Fields(field)); } /** * 當一個Tuple處理成功時,會調用這個方法 */ @Override public void ack(Object obj) { System.out.println("ack:"+obj); } /** * 當Topology中止時,會調用這個方法 */ @Override public void close() { System.out.println("關閉..."); } /** * 當一個Tuple處理失敗時,會調用這個方法 */ @Override public void fail(Object obj) { System.out.println("失敗:"+obj); } }
TestBolt
用於分割單詞。
import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; /** * * Title: TestBolt * Description: * 對單詞進行分割 * Version:1.0.0 * @author pancm * @date 2018年3月16日 */ public class TestBolt extends BaseRichBolt{ /** * */ private static final long serialVersionUID = 4743224635827696343L; private OutputCollector collector; /** * 在Bolt啓動前執行,提供Bolt啓動環境配置的入口 * 通常對於不可序列化的對象進行實例化。 * 注:若是是能夠序列化的對象,那麼最好是使用構造函數。 */ @Override public void prepare(Map map, TopologyContext arg1, OutputCollector collector) { System.out.println("prepare:"+map.get("test")); this.collector=collector; } /** * execute()方法是Bolt實現的核心。 * 也就是執行方法,每次Bolt從流接收一個訂閱的tuple,都會調用這個方法。 */ @Override public void execute(Tuple tuple) { String msg=tuple.getStringByField("word"); System.out.println("開始分割單詞:"+msg); String[] words = msg.toLowerCase().split(" "); for (String word : words) { this.collector.emit(new Values(word));//向下一個bolt發射數據 } } /** * 聲明數據格式 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("count")); } /** * cleanup是IBolt接口中定義,用於釋放bolt佔用的資源。 * Storm在終止一個bolt以前會調用這個方法。 */ @Override public void cleanup() { System.out.println("TestBolt的資源釋放"); } }
Test2Bolt
用於統計單詞出現次數。
import java.util.HashMap; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; /** * * Title: Test2Bolt * Description: * 統計單詞出現的次數 * Version:1.0.0 * @author pancm * @date 2018年3月16日 */ public class Test2Bolt extends BaseRichBolt{ /** * */ private static final long serialVersionUID = 4743224635827696343L; /** * 保存單詞和對應的計數 */ private HashMap<String, Integer> counts = null; private long count=1; /** * 在Bolt啓動前執行,提供Bolt啓動環境配置的入口 * 通常對於不可序列化的對象進行實例化。 * 注:若是是能夠序列化的對象,那麼最好是使用構造函數。 */ @Override public void prepare(Map map, TopologyContext arg1, OutputCollector collector) { System.out.println("prepare:"+map.get("test")); this.counts=new HashMap<String, Integer>(); } /** * execute()方法是Bolt實現的核心。 * 也就是執行方法,每次Bolt從流接收一個訂閱的tuple,都會調用這個方法。 * */ @Override public void execute(Tuple tuple) { String msg=tuple.getStringByField("count"); System.out.println("第"+count+"次統計單詞出現的次數"); /** * 若是不包含該單詞,說明在該map是第一次出現 * 不然進行加1 */ if (!counts.containsKey(msg)) { counts.put(msg, 1); } else { counts.put(msg, counts.get(msg)+1); } count++; } /** * cleanup是IBolt接口中定義,用於釋放bolt佔用的資源。 * Storm在終止一個bolt以前會調用這個方法。 */ @Override public void cleanup() { System.out.println("===========開始顯示單詞數量============"); for (Map.Entry<String, Integer> entry : counts.entrySet()) { System.out.println(entry.getKey() + ": " + entry.getValue()); } System.out.println("===========結束============"); System.out.println("Test2Bolt的資源釋放"); } /** * 聲明數據格式 */ @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { } }
Topology
主程序入口。
import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; /** * * Title: App * Description: * storm測試 * Version:1.0.0 * @author pancm * @date 2018年3月6日 */ public class App { private static final String test_spout="test_spout"; private static final String test_bolt="test_bolt"; private static final String test2_bolt="test2_bolt"; public static void main(String[] args) { //定義一個拓撲 TopologyBuilder builder=new TopologyBuilder(); //設置一個Executeor(線程),默認一個 builder.setSpout(test_spout, new TestSpout(),1); //shuffleGrouping:表示是隨機分組 //設置一個Executeor(線程),和一個task builder.setBolt(test_bolt, new TestBolt(),1).setNumTasks(1).shuffleGrouping(test_spout); //fieldsGrouping:表示是按字段分組 //設置一個Executeor(線程),和一個task builder.setBolt(test2_bolt, new Test2Bolt(),1).setNumTasks(1).fieldsGrouping(test_bolt, new Fields("count")); Config conf = new Config(); conf.put("test", "test"); try{ //運行拓撲 if(args !=null&&args.length>0){ //有參數時,表示向集羣提交做業,並把第一個參數當作topology名稱 System.out.println("運行遠程模式"); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else{//沒有參數時,本地提交 //啓動本地模式 System.out.println("運行本地模式"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Word-counts" ,conf, builder.createTopology() ); Thread.sleep(20000); // //關閉本地集羣 cluster.shutdown(); } }catch (Exception e){ e.printStackTrace(); } } }
輸出結果:
運行本地模式 定義格式... open:test 第1次開始發送數據... 第2次開始發送數據... 第3次開始發送數據... prepare:test prepare:test 開始分割單詞:My nickname is xuwujing 開始分割單詞:My blog address is http://www.panchengming.com/ 開始分割單詞:My interest is playing games 第1次統計單詞出現的次數 第2次統計單詞出現的次數 第3次統計單詞出現的次數 第4次統計單詞出現的次數 第5次統計單詞出現的次數 第6次統計單詞出現的次數 第7次統計單詞出現的次數 第8次統計單詞出現的次數 第9次統計單詞出現的次數 第10次統計單詞出現的次數 第11次統計單詞出現的次數 第12次統計單詞出現的次數 第13次統計單詞出現的次數 第14次統計單詞出現的次數 ===========開始顯示單詞數量============ address: 1 interest: 1 nickname: 1 games: 1 is: 3 xuwujing: 1 playing: 1 my: 3 blog: 1 http://www.panchengming.com/: 1 ===========結束============ Test2Bolt的資源釋放 TestBolt的資源釋放 關閉...
上述的是本地模式運行,若是想在Storm集羣中進行使用,只須要將程序打包爲jar,而後將程序上傳到storm集羣中,
輸入:
storm jar xxx.jar xxx xxx
說明:第一個xxx是storm程序打包的包名,第二個xxx是運行主程序的路徑,第三個xxx則表示主程序輸入的參數,這個能夠隨意。
若是是使用maven打包的話,則須要在pom.xml加上
<plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.pancm.storm.App</mainClass> </manifest> </archive> </configuration> </plugin>
成功運行程序以後,能夠在Storm集羣的UI界面查看該程序的狀態。
到此,本文結束,謝謝閱讀! 本篇文章源碼地址: https://github.com/xuwujing/java-study