[TOC]java
1個worker進程執行的是1個topology的子集(注:不會出現1個worker爲多個topology服務)。1個worker進程會啓動1個或多個executor線程來執行1個topology的(spout或bolt)。所以,1個運行中的topology就是由集羣中多臺(多是一臺)物理機上的一個或者多個worker進程組成的。shell
executor是worker進程啓動的一個單獨線程。apache
每一個executor只會運行1個topology的1個或者多個(spout或bolt)task(注:task能夠是1個或多個,storm默認是1個(spout或bolt)只生成1個task,executor線程會在每次循環裏順序調用全部task實例)。併發
task是最終運行spout或bolt中代碼的執行單元(注:1個task即爲spout或bolt的1個實例,executor線程在執行期間會調用該task的nextTuple或execute方法)。topology啓動後,1個(spout或bolt)的task數目是固定不變的,但該(spout或bolt)使用的executor線程數能夠動態調整(例如:1個executor線程能夠執行該(spout或bolt)的1個或多個task實例)。這意味着,對於1個(spout或bolt)存在這樣的條件:#threads<=#tasks(即:線程數小於等於task數目)。默認狀況下task的數目等於executor線程數目,即1個executor線程只運行1個task。app
默認狀況下,一個supervisor節點最多能夠啓動4個worker進程,每個topology默認佔用一個worker進程,每一個spout或者bolt會佔用1個executor,每一個executor啓動1個task。負載均衡
前面提交做業到集羣時,worker、executor和task的數量狀況以下:ide
以前是1個worker進程 3個executor線程 3個task任務 3個executor,分別爲: id_num_spout id_sum_bolt __acker
如今在代碼中將其worker個數設置爲2,以下:網站
package cn.xpleaf.bigdata.storm.parallelism; import cn.xpleaf.bigdata.storm.utils.StormUtil; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Date; import java.util.Map; /** * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。 * <p> * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯 * MapReduce的組件:Mapper和Reducer、數據是Writable,經過一個main中的job將兩者關聯 * <p> * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些不必的方法進行了重寫,但其重寫的代碼沒有實現任何功能。 * 咱們稱這爲適配器模式 */ public class ParallelismWorkerSumTopology { /** * 數據源 */ static class OrderSpout extends BaseRichSpout { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private SpoutOutputCollector collector; // 發送tuple的組件 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } /** * 接收數據的核心方法 */ @Override public void nextTuple() { long num = 0; while (true) { num++; StormUtil.sleep(1000); System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產生的訂單金額:" + num); this.collector.emit(new Values(num)); } } /** * 是對發送出去的數據的描述schema */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("order_cost")); } } /** * 計算和的Bolt節點 */ static class SumBolt extends BaseRichBolt { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private OutputCollector collector; // 發送tuple的組件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private Long sumOrderCost = 0L; /** * 處理數據的核心方法 */ @Override public void execute(Tuple input) { Long orderCost = input.getLongByField("order_cost"); sumOrderCost += orderCost; System.out.println("商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost); StormUtil.sleep(1000); } /** * 若是當前bolt爲最後一個處理單元,該方法能夠不用管 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } /** * 構建拓撲,至關於在MapReduce中構建Job */ public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); /** * 設置spout和bolt的dag(有向無環圖) */ builder.setSpout("id_order_spout", new OrderSpout()); builder.setBolt("id_sum_bolt", new SumBolt()) .shuffleGrouping("id_order_spout"); // 經過不一樣的數據流轉方式,來指定數據的上游組件 // 使用builder構建topology StormTopology topology = builder.createTopology(); String topologyName = ParallelismWorkerSumTopology.class.getSimpleName(); // 拓撲的名稱 Config config = new Config(); // Config()對象繼承自HashMap,但自己封裝了一些基本的配置 /** * 以前是1個worker進程 3個executor線程 3個task任務 * 3個executor,分別爲: * id_num_spout * id_sum_bolt * __acker * * 將worker進程修改成2個後: * executor線程數:4個 * task任務數:4個 * 分析: * 最簡單的緣由就是,咱們的應用程序過小了,徹底沒有必要開啓多個executor線程。 * 也就是說不會簡單的進行worker的副本拷貝,這裏多出來的一個executor線程是每個worker進程都有的 * 一個默認的系統級別的bolt,就是__acker */ config.setNumWorkers(2); // 設置當前topology啓動須要幾個worker進程 // config.setNumAckers(0); // 設置__acker數量爲0個,這樣就不會有其executor線程 // 啓動topology,本地啓動使用LocalCluster,集羣啓動使用StormSubmitter if (args == null || args.length < 1) { // 沒有參數時使用本地模式,有參數時使用集羣模式 LocalCluster localCluster = new LocalCluster(); // 本地開發模式,建立的對象爲LocalCluster localCluster.submitTopology(topologyName, config, topology); } else { StormSubmitter.submitTopology(topologyName, config, topology); } } }
打包後上傳到集羣中並提交,在storm ui中查看其狀態,以下:ui
能夠看到調整後,三者的數量狀況爲:this
將worker進程修改成2個後: executor線程數:4個 task任務數:4個 分析: 最簡單的緣由就是,咱們的應用程序過小了,徹底沒有必要開啓多個executor線程。 也就是說不會簡單的進行worker的副本拷貝,這裏多出來的一個executor線程是每個worker進程都有的 一個默認的系統級別的bolt,就是__acker
若是不但願系統級別的__acker
運行,能夠在代碼中打開註釋:
config.setNumAckers(0);
即將其個數設置爲0個,而後再上傳到集羣中運行便可。
須要在設置spout
和bolt
時指定:
builder.setSpout("id_order_spout", new OrderSpout(), 2); builder.setBolt("id_sum_bolt", new SumBolt(), 3) .shuffleGrouping("id_order_spout"); // 經過不一樣的數據流轉方式,來指定數據的上游組件
完整程序代碼以下:
package cn.xpleaf.bigdata.storm.parallelism; import cn.xpleaf.bigdata.storm.utils.StormUtil; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Date; import java.util.Map; /** * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。 * <p> * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯 * MapReduce的組件:Mapper和Reducer、數據是Writable,經過一個main中的job將兩者關聯 * <p> * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些不必的方法進行了重寫,但其重寫的代碼沒有實現任何功能。 * 咱們稱這爲適配器模式 */ public class ParallelismExecutorSumTopology { /** * 數據源 */ static class OrderSpout extends BaseRichSpout { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private SpoutOutputCollector collector; // 發送tuple的組件 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } /** * 接收數據的核心方法 */ @Override public void nextTuple() { long num = 0; while (true) { num++; StormUtil.sleep(1000); System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產生的訂單金額:" + num); this.collector.emit(new Values(num)); } } /** * 是對發送出去的數據的描述schema */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("order_cost")); } } /** * 計算和的Bolt節點 */ static class SumBolt extends BaseRichBolt { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private OutputCollector collector; // 發送tuple的組件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private Long sumOrderCost = 0L; /** * 處理數據的核心方法 */ @Override public void execute(Tuple input) { Long orderCost = input.getLongByField("order_cost"); sumOrderCost += orderCost; System.out.println("商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost); StormUtil.sleep(1000); } /** * 若是當前bolt爲最後一個處理單元,該方法能夠不用管 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } /** * 構建拓撲,至關於在MapReduce中構建Job */ public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); /** * 設置spout和bolt的dag(有向無環圖) */ builder.setSpout("id_order_spout", new OrderSpout(), 2); builder.setBolt("id_sum_bolt", new SumBolt(), 3) .shuffleGrouping("id_order_spout"); // 經過不一樣的數據流轉方式,來指定數據的上游組件 // 使用builder構建topology StormTopology topology = builder.createTopology(); String topologyName = ParallelismExecutorSumTopology.class.getSimpleName(); // 拓撲的名稱 Config config = new Config(); // Config()對象繼承自HashMap,但自己封裝了一些基本的配置 /** * 以前是1個worker進程 3個executor線程 3個task任務 * 3個executor,分別爲: * id_num_spout * id_sum_bolt * __acker * * 如今修改成:builder.setSpout("id_order_spout", new OrderSpout(), 2); * builder.setBolt("id_sum_bolt", new SumBolt(), 3) * 因此應該有6個executor,分別爲: * id_num_spout 2個 * id_sum_bolt 3個 * __acker 1個 * 同時task也爲6個 * */ // 啓動topology,本地啓動使用LocalCluster,集羣啓動使用StormSubmitter if (args == null || args.length < 1) { // 沒有參數時使用本地模式,有參數時使用集羣模式 LocalCluster localCluster = new LocalCluster(); // 本地開發模式,建立的對象爲LocalCluster localCluster.submitTopology(topologyName, config, topology); } else { StormSubmitter.submitTopology(topologyName, config, topology); } } }
上傳到集羣中提交做業後,狀況以下:
因此應該有6個executor,分別爲: id_num_spout 2個 id_sum_bolt 3個 __acker 1個 同時task也爲6個
另外,若是這時查看輸出的log,會發現spout的輸出,不少狀況下都是一會兒輸出兩條信息,由於此時有兩個線程在運行,而bolt的狀況也是相似的。
...... 2018-04-13 10:22:30.406 STDIO [INFO] 當前時間20180413102230產生的訂單金額:422 2018-04-13 10:22:30.517 STDIO [INFO] 當前時間20180413102230產生的訂單金額:422 2018-04-13 10:22:30.519 STDIO [INFO] 商城網站到目前20180413102230的商品總交易額59013 2018-04-13 10:22:30.520 STDIO [INFO] 商城網站到目前20180413102230的商品總交易額55716 2018-04-13 10:22:31.407 STDIO [INFO] 當前時間20180413102231產生的訂單金額:423 2018-04-13 10:22:31.411 STDIO [INFO] 商城網站到目前20180413102231的商品總交易額62936 2018-04-13 10:22:31.518 STDIO [INFO] 當前時間20180413102231產生的訂單金額:423 2018-04-13 10:22:31.520 STDIO [INFO] 商城網站到目前20180413102231的商品總交易額59434 2018-04-13 10:22:32.408 STDIO [INFO] 當前時間20180413102232產生的訂單金額:424 2018-04-13 10:22:32.411 STDIO [INFO] 商城網站到目前20180413102232的商品總交易額63360 2018-04-13 10:22:32.519 STDIO [INFO] 當前時間20180413102232產生的訂單金額:424 2018-04-13 10:22:32.521 STDIO [INFO] 商城網站到目前20180413102232的商品總交易額59855 2018-04-13 10:22:32.523 STDIO [INFO] 商城網站到目前20180413102232的商品總交易額56140 2018-04-13 10:22:33.409 STDIO [INFO] 當前時間20180413102233產生的訂單金額:425 2018-04-13 10:22:33.520 STDIO [INFO] 當前時間20180413102233產生的訂單金額:425 2018-04-13 10:22:33.521 STDIO [INFO] 商城網站到目前20180413102233的商品總交易額60277 2018-04-13 10:22:33.523 STDIO [INFO] 商城網站到目前20180413102233的商品總交易額63785 2018-04-13 10:22:34.410 STDIO [INFO] 當前時間20180413102234產生的訂單金額:426 2018-04-13 10:22:34.521 STDIO [INFO] 當前時間20180413102234產生的訂單金額:426 2018-04-13 10:22:34.535 STDIO [INFO] 商城網站到目前20180413102234的商品總交易額60700 2018-04-13 10:22:34.535 STDIO [INFO] 商城網站到目前20180413102234的商品總交易額64211 2018-04-13 10:22:35.411 STDIO [INFO] 當前時間20180413102235產生的訂單金額:427 2018-04-13 10:22:35.522 STDIO [INFO] 當前時間20180413102235產生的訂單金額:427 ......
須要在設置spout
和bolt
時指定:
builder.setSpout("id_order_spout", new OrderSpout()).setNumTasks(2); builder.setBolt("id_sum_bolt", new SumBolt()).setNumTasks(3) .shuffleGrouping("id_order_spout"); // 經過不一樣的數據流轉方式,來指定數據的上游組件
完整程序代碼以下:
package cn.xpleaf.bigdata.storm.parallelism; import cn.xpleaf.bigdata.storm.utils.StormUtil; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Date; import java.util.Map; /** * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。 * <p> * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯 * MapReduce的組件:Mapper和Reducer、數據是Writable,經過一個main中的job將兩者關聯 * <p> * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些不必的方法進行了重寫,但其重寫的代碼沒有實現任何功能。 * 咱們稱這爲適配器模式 */ public class ParallelismTaskSumTopology { /** * 數據源 */ static class OrderSpout extends BaseRichSpout { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private SpoutOutputCollector collector; // 發送tuple的組件 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } /** * 接收數據的核心方法 */ @Override public void nextTuple() { long num = 0; while (true) { num++; StormUtil.sleep(1000); System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產生的訂單金額:" + num); this.collector.emit(new Values(num)); } } /** * 是對發送出去的數據的描述schema */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("order_cost")); } } /** * 計算和的Bolt節點 */ static class SumBolt extends BaseRichBolt { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private OutputCollector collector; // 發送tuple的組件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private Long sumOrderCost = 0L; /** * 處理數據的核心方法 */ @Override public void execute(Tuple input) { Long orderCost = input.getLongByField("order_cost"); sumOrderCost += orderCost; System.out.println("商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost); StormUtil.sleep(1000); } /** * 若是當前bolt爲最後一個處理單元,該方法能夠不用管 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } /** * 構建拓撲,至關於在MapReduce中構建Job */ public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); /** * 設置spout和bolt的dag(有向無環圖) */ builder.setSpout("id_order_spout", new OrderSpout()).setNumTasks(2); builder.setBolt("id_sum_bolt", new SumBolt()).setNumTasks(3) .shuffleGrouping("id_order_spout"); // 經過不一樣的數據流轉方式,來指定數據的上游組件 // 使用builder構建topology StormTopology topology = builder.createTopology(); String topologyName = ParallelismTaskSumTopology.class.getSimpleName(); // 拓撲的名稱 Config config = new Config(); // Config()對象繼承自HashMap,但自己封裝了一些基本的配置 /** * 以前是1個worker進程 3個executor線程 3個task任務 * 3個executor,分別爲: * id_num_spout * id_sum_bolt * __acker * * 如今修改成:builder.setSpout("id_order_spout", new OrderSpout()).setNumTasks(2); * builder.setBolt("id_sum_bolt", new SumBolt()).setNumTasks(3) * 因此應該有3個executor,分別爲: * id_num_spout 1個 * id_sum_bolt 1個 * __acker 1個 * 同時task爲6個: * id_num_spout 2個 * id_sum_bolt 3個 * __acker 1個 * */ // 啓動topology,本地啓動使用LocalCluster,集羣啓動使用StormSubmitter if (args == null || args.length < 1) { // 沒有參數時使用本地模式,有參數時使用集羣模式 LocalCluster localCluster = new LocalCluster(); // 本地開發模式,建立的對象爲LocalCluster localCluster.submitTopology(topologyName, config, topology); } else { StormSubmitter.submitTopology(topologyName, config, topology); } } }
運行後,其狀況以下:
因此應該有3個executor,分別爲: id_num_spout 1個 id_sum_bolt 1個 __acker 1個 同時task爲6個: id_num_spout 2個 id_sum_bolt 3個 __acker 1個
Worker(slot)
注意:worker之間通訊是經過Netty 進行通訊的
不推薦使用
topologyBuilder.setBolt("green-bolt", new GreenBolt(),2) .setNumTasks(4).shuffleGrouping("blue-spout);
# 10秒以後開始調整 # Reconfigure the topology "mytopology" to use 5 worker processes, # the spout "blue-spout" to use 3 executors and # the bolt "yellow-bolt" to use 10 executors. storm rebalance topologyName -w 10 -n 5 -e spout_id=3 -e id_bolt=10
注意:acker數目運行時是不會變化的,因此多指定幾個worker進程,acker線程數也不會增長。 -w:表示超時時間,rebalance首先會在一個超時時間內註銷掉拓撲,而後在整個集羣中從新分配 worker。
問題:
-e spout_id=3 -e id_bolt=10 有時不會增長併發度
緣由:
You can only increase the parallelism (number of executors) to the number of tasks. So if your component is having for example (number of executors: 50, number of tasks: 50) then you can not increase the parallelism, however you can decrease it. 就是說spout和bolt的並行數,最多能夠調整到它的taskNum,默認狀況下,taskNum是和你設置的paralismNum相同的。 #threads<=#tasks
那麼到底並行度設置多少合適呢,理論參考值:
上面的案例對於分析storm的並行度會有很是大的幫助,同時也很是清晰地說明了worker
、executor
、task
三者之間的關係。
假設storm集羣如今有三個節點。一個做爲nimbus,兩個做爲supervisor。到這裏先介紹一下storm邏輯上有兩個component,一個是Spout,另外一個是Bolt。stream由Spout發出,在不一樣的Bolt之間進行處理,在其中傳遞的是storm的基本處理單位:Tuple。由Spout發出一個一個Tuple,而後Bolt接收Tuple進行各類各樣的處理。這一整個過程構成一個DAG(有向無環圖)。在storm裏面叫作Topology。
上圖中spout的處理邏輯是將一句話發出給下一個Bolt,而後下一個Bolt作句子的單詞分割,下一個作計數,最後的Bolt作彙總顯示。這裏能夠有多個Bolt或者Spout進行並行處理。
那麼這裏有一個問題,數據是如何從spout到bolt中的呢,若是bolt是多個狀況呢?這就是咱們所說的流分組,也就是在Spout與Bolt、Bolt與Bolt之間傳遞Tuple的方式,咱們稱之爲流分組storm grouping。
Shuffle Grouping
隨機分組, 隨機派發stream裏面的tuple, 保證bolt中的每一個任務接收到的tuple數目相同.(它能實現較好的負載均衡)
Fields Grouping
按字段分組, 好比按userid來分組, 具備一樣userid的tuple會被分到同一任務, 而不一樣的userid則會被分配到不一樣的任務
All Grouping
廣播發送,對於每個tuple,Bolts中的全部任務都會收到.
Global Grouping
全局分組,這個tuple被分配到storm中的一個bolt的其中一個task.再具體一點就是分配給id值最低的那個task.
Non Grouping
隨機分派,意思是說stream不關心到底誰會收到它的tuple.目前他和Shuffle grouping是同樣的效果,
Direct Grouping
直接分組,這是一種比較特別的分組方法,用這種分組意味着消息的發送者具體由消息接收者的哪一個task處理這個消息.只有被聲明爲Direct Stream的消息流能夠聲明這種分組方法.並且這種消息tuple必須使用emitDirect方法來發射.消息處理者能夠經過TopologyContext來或者處理它的消息的taskid (OutputCollector.emit方法也會返回taskid)
localOrShuffleGrouping
是指若是目標Bolt 中的一個或者多個Task 和當前產生數據的Task 在同一個Worker 進程裏面,那麼就走內部的線程間通訊,將Tuple 直接發給在當前Worker 進程的目的Task。不然,同shuffleGrouping。(在工做中使用的頻率仍是比較高的)
CustomStreamGrouping
自定義流式分組。
將計算總和的例子,spout並行度設置爲1,bolt並行度設置爲3,group方式設置爲Shuffle Grouping
,程序代碼以下:
package cn.xpleaf.bigdata.storm.group; import cn.xpleaf.bigdata.storm.utils.StormUtil; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Date; import java.util.Map; /** * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。 * <p> * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯 * MapReduce的組件:Mapper和Reducer、數據是Writable,經過一個main中的job將兩者關聯 * <p> * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些不必的方法進行了重寫,但其重寫的代碼沒有實現任何功能。 * 咱們稱這爲適配器模式 */ public class ShuffleGroupingSumTopology { /** * 數據源 */ static class OrderSpout extends BaseRichSpout { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private SpoutOutputCollector collector; // 發送tuple的組件 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } /** * 接收數據的核心方法 */ @Override public void nextTuple() { long num = 0; while (true) { num++; StormUtil.sleep(1000); System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產生的訂單金額:" + num); this.collector.emit(new Values(num)); } } /** * 是對發送出去的數據的描述schema */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("order_cost")); } } /** * 計算和的Bolt節點 */ static class SumBolt extends BaseRichBolt { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private OutputCollector collector; // 發送tuple的組件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private Long sumOrderCost = 0L; /** * 處理數據的核心方法 */ @Override public void execute(Tuple input) { Long orderCost = input.getLongByField("order_cost"); sumOrderCost += orderCost; System.out.println("線程ID:" + Thread.currentThread().getId() + " ,商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost); StormUtil.sleep(1000); } /** * 若是當前bolt爲最後一個處理單元,該方法能夠不用管 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } /** * 構建拓撲,至關於在MapReduce中構建Job */ public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); /** * 設置spout和bolt的dag(有向無環圖) */ builder.setSpout("id_order_spout", new OrderSpout()); builder.setBolt("id_sum_bolt", new SumBolt(), 3) .shuffleGrouping("id_order_spout"); // 經過不一樣的數據流轉方式,來指定數據的上游組件 // 使用builder構建topology StormTopology topology = builder.createTopology(); String topologyName = ShuffleGroupingSumTopology.class.getSimpleName(); // 拓撲的名稱 Config config = new Config(); // Config()對象繼承自HashMap,但自己封裝了一些基本的配置 // 啓動topology,本地啓動使用LocalCluster,集羣啓動使用StormSubmitter if (args == null || args.length < 1) { // 沒有參數時使用本地模式,有參數時使用集羣模式 LocalCluster localCluster = new LocalCluster(); // 本地開發模式,建立的對象爲LocalCluster localCluster.submitTopology(topologyName, config, topology); } else { StormSubmitter.submitTopology(topologyName, config, topology); } } }
在集羣中啓動後,查看輸出的日誌信息:
...... 2018-04-13 11:53:58.848 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413115358的商品總交易額31 2018-04-13 11:53:59.846 STDIO [INFO] 當前時間20180413115359產生的訂單金額:14 2018-04-13 11:53:59.850 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413115359的商品總交易額30 2018-04-13 11:54:00.847 STDIO [INFO] 當前時間20180413115400產生的訂單金額:15 2018-04-13 11:54:00.851 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413115400的商品總交易額45 2018-04-13 11:54:01.848 STDIO [INFO] 當前時間20180413115401產生的訂單金額:16 2018-04-13 11:54:01.851 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413115401的商品總交易額60 2018-04-13 11:54:02.849 STDIO [INFO] 當前時間20180413115402產生的訂單金額:17 2018-04-13 11:54:02.852 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413115402的商品總交易額62 2018-04-13 11:54:03.851 STDIO [INFO] 當前時間20180413115403產生的訂單金額:18 2018-04-13 11:54:03.855 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413115403的商品總交易額78 2018-04-13 11:54:04.852 STDIO [INFO] 當前時間20180413115404產生的訂單金額:19 2018-04-13 11:54:04.856 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413115404的商品總交易額50 2018-04-13 11:54:05.853 STDIO [INFO] 當前時間20180413115405產生的訂單金額:20 2018-04-13 11:54:05.858 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413115405的商品總交易額70 2018-04-13 11:54:06.855 STDIO [INFO] 當前時間20180413115406產生的訂單金額:21 ......
能夠看到有bolt有3個線程在執行。
將計算總和的例子,spout並行度設置爲1,bolt並行度設置爲3,group方式設置爲AllGrouping
,程序代碼以下:
package cn.xpleaf.bigdata.storm.group; import cn.xpleaf.bigdata.storm.utils.StormUtil; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Date; import java.util.Map; /** * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。 * <p> * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯 * MapReduce的組件:Mapper和Reducer、數據是Writable,經過一個main中的job將兩者關聯 * <p> * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些不必的方法進行了重寫,但其重寫的代碼沒有實現任何功能。 * 咱們稱這爲適配器模式 */ public class AllGroupingSumTopology { /** * 數據源 */ static class OrderSpout extends BaseRichSpout { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private SpoutOutputCollector collector; // 發送tuple的組件 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } /** * 接收數據的核心方法 */ @Override public void nextTuple() { long num = 0; while (true) { num++; StormUtil.sleep(1000); System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產生的訂單金額:" + num); this.collector.emit(new Values(num)); } } /** * 是對發送出去的數據的描述schema */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("order_cost")); } } /** * 計算和的Bolt節點 */ static class SumBolt extends BaseRichBolt { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private OutputCollector collector; // 發送tuple的組件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private Long sumOrderCost = 0L; /** * 處理數據的核心方法 */ @Override public void execute(Tuple input) { Long orderCost = input.getLongByField("order_cost"); sumOrderCost += orderCost; System.out.println("線程ID:" + Thread.currentThread().getId() + " ,商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost); StormUtil.sleep(1000); } /** * 若是當前bolt爲最後一個處理單元,該方法能夠不用管 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } /** * 構建拓撲,至關於在MapReduce中構建Job */ public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); /** * 設置spout和bolt的dag(有向無環圖) */ builder.setSpout("id_order_spout", new OrderSpout()); builder.setBolt("id_sum_bolt", new SumBolt(), 3) .allGrouping("id_order_spout"); // 經過不一樣的數據流轉方式,來指定數據的上游組件 // 使用builder構建topology StormTopology topology = builder.createTopology(); String topologyName = AllGroupingSumTopology.class.getSimpleName(); // 拓撲的名稱 Config config = new Config(); // Config()對象繼承自HashMap,但自己封裝了一些基本的配置 // 啓動topology,本地啓動使用LocalCluster,集羣啓動使用StormSubmitter if (args == null || args.length < 1) { // 沒有參數時使用本地模式,有參數時使用集羣模式 LocalCluster localCluster = new LocalCluster(); // 本地開發模式,建立的對象爲LocalCluster localCluster.submitTopology(topologyName, config, topology); } else { StormSubmitter.submitTopology(topologyName, config, topology); } } }
上傳到集羣提交後,輸出結果以下:
2018-04-13 12:42:36.992 STDIO [INFO] 當前時間20180413124236產生的訂單金額:1 2018-04-13 12:42:36.998 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413124236的商品總交易額1 2018-04-13 12:42:36.999 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413124236的商品總交易額1 2018-04-13 12:42:37.000 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413124236的商品總交易額1 2018-04-13 12:42:37.995 STDIO [INFO] 當前時間20180413124237產生的訂單金額:2 2018-04-13 12:42:37.999 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413124237的商品總交易額3 2018-04-13 12:42:38.000 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413124237的商品總交易額3 2018-04-13 12:42:38.000 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413124238的商品總交易額3 2018-04-13 12:42:38.996 STDIO [INFO] 當前時間20180413124238產生的訂單金額:3 2018-04-13 12:42:39.000 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413124239的商品總交易額6 2018-04-13 12:42:39.000 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413124239的商品總交易額6 2018-04-13 12:42:39.001 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413124239的商品總交易額6 2018-04-13 12:42:39.998 STDIO [INFO] 當前時間20180413124239產生的訂單金額:4 2018-04-13 12:42:40.001 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413124240的商品總交易額10 2018-04-13 12:42:40.002 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413124240的商品總交易額10 2018-04-13 12:42:40.002 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413124240的商品總交易額10 2018-04-13 12:42:40.999 STDIO [INFO] 當前時間20180413124240產生的訂單金額:5 2018-04-13 12:42:41.002 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413124241的商品總交易額15 2018-04-13 12:42:41.003 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413124241的商品總交易額15 2018-04-13 12:42:41.003 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413124241的商品總交易額15 2018-04-13 12:42:42.000 STDIO [INFO] 當前時間20180413124242產生的訂單金額:6 2018-04-13 12:42:42.004 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413124242的商品總交易額21 2018-04-13 12:42:42.004 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413124242的商品總交易額21 2018-04-13 12:42:42.004 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413124242的商品總交易額21 ......
能夠看到有三個輸出的bolt都同時收到了spout發送過來的tuple,這確實有點浪費資源。
注意,上面查看bolt的輸出結果,這與多個線程只輸出一份數據不同,由於其三個輸出都會同時輸出相同的一份數據,而若是隻是多個線程非AllGrouping的狀況下,不會同一份數據輸出屢次的,這點尤爲須要注意。
將計算總和的例子,spout並行度設置爲1,bolt並行度設置爲3,group方式設置爲GlobalGrouping
,程序代碼以下:
package cn.xpleaf.bigdata.storm.group; import cn.xpleaf.bigdata.storm.utils.StormUtil; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Date; import java.util.Map; /** * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。 * <p> * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯 * MapReduce的組件:Mapper和Reducer、數據是Writable,經過一個main中的job將兩者關聯 * <p> * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些不必的方法進行了重寫,但其重寫的代碼沒有實現任何功能。 * 咱們稱這爲適配器模式 */ public class GlobalGroupingSumTopology { /** * 數據源 */ static class OrderSpout extends BaseRichSpout { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private SpoutOutputCollector collector; // 發送tuple的組件 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } /** * 接收數據的核心方法 */ @Override public void nextTuple() { long num = 0; while (true) { num++; StormUtil.sleep(1000); System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產生的訂單金額:" + num); this.collector.emit(new Values(num)); } } /** * 是對發送出去的數據的描述schema */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("order_cost")); } } /** * 計算和的Bolt節點 */ static class SumBolt extends BaseRichBolt { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private OutputCollector collector; // 發送tuple的組件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private Long sumOrderCost = 0L; /** * 處理數據的核心方法 */ @Override public void execute(Tuple input) { Long orderCost = input.getLongByField("order_cost"); sumOrderCost += orderCost; System.out.println("線程ID:" + Thread.currentThread().getId() + " ,商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost); StormUtil.sleep(1000); } /** * 若是當前bolt爲最後一個處理單元,該方法能夠不用管 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } /** * 構建拓撲,至關於在MapReduce中構建Job */ public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); /** * 設置spout和bolt的dag(有向無環圖) */ builder.setSpout("id_order_spout", new OrderSpout()); builder.setBolt("id_sum_bolt", new SumBolt(), 3) .globalGrouping("id_order_spout"); // 經過不一樣的數據流轉方式,來指定數據的上游組件 // 使用builder構建topology StormTopology topology = builder.createTopology(); String topologyName = GlobalGroupingSumTopology.class.getSimpleName(); // 拓撲的名稱 Config config = new Config(); // Config()對象繼承自HashMap,但自己封裝了一些基本的配置 // 啓動topology,本地啓動使用LocalCluster,集羣啓動使用StormSubmitter if (args == null || args.length < 1) { // 沒有參數時使用本地模式,有參數時使用集羣模式 LocalCluster localCluster = new LocalCluster(); // 本地開發模式,建立的對象爲LocalCluster localCluster.submitTopology(topologyName, config, topology); } else { StormSubmitter.submitTopology(topologyName, config, topology); } } }
打包上傳到集羣運行後,查看其輸出結果以下:
2018-04-13 12:56:06.506 STDIO [INFO] 當前時間20180413125606產生的訂單金額:1 2018-04-13 12:56:06.515 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413125606的商品總交易額1 2018-04-13 12:56:07.512 STDIO [INFO] 當前時間20180413125607產生的訂單金額:2 2018-04-13 12:56:07.516 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413125607的商品總交易額3 2018-04-13 12:56:08.513 STDIO [INFO] 當前時間20180413125608產生的訂單金額:3 2018-04-13 12:56:08.517 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413125608的商品總交易額6 2018-04-13 12:56:09.515 STDIO [INFO] 當前時間20180413125609產生的訂單金額:4 2018-04-13 12:56:09.519 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413125609的商品總交易額10 2018-04-13 12:56:10.518 STDIO [INFO] 當前時間20180413125610產生的訂單金額:5 2018-04-13 12:56:10.521 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413125610的商品總交易額15 2018-04-13 12:56:11.519 STDIO [INFO] 當前時間20180413125611產生的訂單金額:6 2018-04-13 12:56:11.523 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413125611的商品總交易額21 2018-04-13 12:56:12.520 STDIO [INFO] 當前時間20180413125612產生的訂單金額:7 2018-04-13 12:56:12.524 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413125612的商品總交易額28 2018-04-13 12:56:13.521 STDIO [INFO] 當前時間20180413125613產生的訂單金額:8 2018-04-13 12:56:13.525 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413125613的商品總交易額36 ......
能夠看到這與AllGrouping徹底不一樣,三個bolt的executor線程,可是卻只有一個在執行操做。
經過前面三個流式分組方式的驗證,能夠很是清晰地瞭解其含義:
ShuffleGrouping:三個bolt線程,同時執行,但對於同一個tuple數據,只有一個bolt會接收到,而且是隨機的。
AllGrouping:三個bolt線程,同時執行,但對於同一個tuple數據,3個bolt都會接收到。
GlobalGrouping:三個bolt線程,同時執行,但對於同一個tuple數據,只有固定一個bolt會接收到,其它2個bolt不會接收到。
在計算總和的例子上,再添加一個user_id
的field,對其進行取模計算,同時在設置流式分組方式爲根據user_id
進行分組,而且爲了驗證其概念,設置bolt的並行度爲3,這樣理論上來講是,spout上產生的模爲1 2 0的的userId的tuple會分別發送到三個不一樣線程ID的bolt上,後面咱們只須要觀察輸出便可。
程序代碼以下:
package cn.xpleaf.bigdata.storm.group; import cn.xpleaf.bigdata.storm.utils.StormUtil; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Date; import java.util.Map; /** * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。 * <p> * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯 * MapReduce的組件:Mapper和Reducer、數據是Writable,經過一個main中的job將兩者關聯 * <p> * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些不必的方法進行了重寫,但其重寫的代碼沒有實現任何功能。 * 咱們稱這爲適配器模式 * * 流式分組之filedsGrouping,字段分組 * 有點像SQL中的group by * 或者能夠理解爲hash取模分區 */ public class FieldsGroupingSumTopology { /** * 數據源 */ static class OrderSpout extends BaseRichSpout { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private SpoutOutputCollector collector; // 發送tuple的組件 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } /** * 接收數據的核心方法 */ @Override public void nextTuple() { long num = 0; while (true) { num++; long userId = num % 3; // 0 1 2 System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + ",用戶-->" + userId + "<--產生的訂單金額:" + num); this.collector.emit(new Values(userId, num)); StormUtil.sleep(1000); } } /** * 是對發送出去的數據的描述schema */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("user_id", "order_cost")); } } /** * 計算和的Bolt節點 */ static class SumBolt extends BaseRichBolt { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private OutputCollector collector; // 發送tuple的組件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private Long sumOrderCost = 0L; /** * 處理數據的核心方法 */ @Override public void execute(Tuple input) { Long userId = input.getLongByField("user_id"); Long orderCost = input.getLongByField("order_cost"); sumOrderCost += orderCost; System.out.println("線程ID:" + Thread.currentThread().getId() + " ,商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "用戶-->" + userId + "<--的商品總交易額" + sumOrderCost); StormUtil.sleep(1000); } /** * 若是當前bolt爲最後一個處理單元,該方法能夠不用管 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } /** * 構建拓撲,至關於在MapReduce中構建Job */ public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); /** * 設置spout和bolt的dag(有向無環圖) */ builder.setSpout("id_order_spout", new OrderSpout()); builder.setBolt("id_sum_bolt", new SumBolt(), 3) .fieldsGrouping("id_order_spout", new Fields("user_id")); // 經過不一樣的數據流轉方式,來指定數據的上游組件 // 使用builder構建topology StormTopology topology = builder.createTopology(); String topologyName = FieldsGroupingSumTopology.class.getSimpleName(); // 拓撲的名稱 Config config = new Config(); // Config()對象繼承自HashMap,但自己封裝了一些基本的配置 // 啓動topology,本地啓動使用LocalCluster,集羣啓動使用StormSubmitter if (args == null || args.length < 1) { // 沒有參數時使用本地模式,有參數時使用集羣模式 LocalCluster localCluster = new LocalCluster(); // 本地開發模式,建立的對象爲LocalCluster localCluster.submitTopology(topologyName, config, topology); } else { StormSubmitter.submitTopology(topologyName, config, topology); } } }
打包上傳到集羣並提交做業後,輸出結果以下:
2018-04-13 15:53:37.836 STDIO [INFO] 當前時間20180413155337,用戶-->1<--產生的訂單金額:1 2018-04-13 15:53:37.843 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413155337用戶-->1<--的商品總交易額1 2018-04-13 15:53:38.839 STDIO [INFO] 當前時間20180413155338,用戶-->2<--產生的訂單金額:2 2018-04-13 15:53:38.844 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413155338用戶-->2<--的商品總交易額2 2018-04-13 15:53:39.841 STDIO [INFO] 當前時間20180413155339,用戶-->0<--產生的訂單金額:3 2018-04-13 15:53:39.845 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413155339用戶-->0<--的商品總交易額3 2018-04-13 15:53:40.842 STDIO [INFO] 當前時間20180413155340,用戶-->1<--產生的訂單金額:4 2018-04-13 15:53:40.846 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413155340用戶-->1<--的商品總交易額5 2018-04-13 15:53:41.844 STDIO [INFO] 當前時間20180413155341,用戶-->2<--產生的訂單金額:5 2018-04-13 15:53:41.850 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413155341用戶-->2<--的商品總交易額7 2018-04-13 15:53:42.848 STDIO [INFO] 當前時間20180413155342,用戶-->0<--產生的訂單金額:6 2018-04-13 15:53:42.851 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413155342用戶-->0<--的商品總交易額9 2018-04-13 15:53:43.849 STDIO [INFO] 當前時間20180413155343,用戶-->1<--產生的訂單金額:7 2018-04-13 15:53:43.852 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413155343用戶-->1<--的商品總交易額12 2018-04-13 15:53:44.850 STDIO [INFO] 當前時間20180413155344,用戶-->2<--產生的訂單金額:8 2018-04-13 15:53:44.853 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413155344用戶-->2<--的商品總交易額15 2018-04-13 15:53:45.852 STDIO [INFO] 當前時間20180413155345,用戶-->0<--產生的訂單金額:9 2018-04-13 15:53:45.855 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413155345用戶-->0<--的商品總交易額18 2018-04-13 15:53:46.853 STDIO [INFO] 當前時間20180413155346,用戶-->1<--產生的訂單金額:10 2018-04-13 15:53:46.856 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413155346用戶-->1<--的商品總交易額22 2018-04-13 15:53:47.854 STDIO [INFO] 當前時間20180413155347,用戶-->2<--產生的訂單金額:11 2018-04-13 15:53:47.859 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413155347用戶-->2<--的商品總交易額26 2018-04-13 15:53:48.855 STDIO [INFO] 當前時間20180413155348,用戶-->0<--產生的訂單金額:12 2018-04-13 15:53:48.860 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413155348用戶-->0<--的商品總交易額30 2018-04-13 15:53:49.857 STDIO [INFO] 當前時間20180413155349,用戶-->1<--產生的訂單金額:13 2018-04-13 15:53:49.860 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413155349用戶-->1<--的商品總交易額35 2018-04-13 15:53:50.859 STDIO [INFO] 當前時間20180413155350,用戶-->2<--產生的訂單金額:14 2018-04-13 15:53:50.862 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413155350用戶-->2<--的商品總交易額40 2018-04-13 15:53:51.860 STDIO [INFO] 當前時間20180413155351,用戶-->0<--產生的訂單金額:15 2018-04-13 15:53:51.863 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413155351用戶-->0<--的商品總交易額45 2018-04-13 15:53:52.861 STDIO [INFO] 當前時間20180413155352,用戶-->1<--產生的訂單金額:16 2018-04-13 15:53:52.863 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413155352用戶-->1<--的商品總交易額51 2018-04-13 15:53:53.862 STDIO [INFO] 當前時間20180413155353,用戶-->2<--產生的訂單金額:17 2018-04-13 15:53:53.866 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413155353用戶-->2<--的商品總交易額57 2018-04-13 15:53:54.863 STDIO [INFO] 當前時間20180413155354,用戶-->0<--產生的訂單金額:18 2018-04-13 15:53:54.867 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413155354用戶-->0<--的商品總交易額63 ......
那麼結果就顯而易見了,user_id模爲1的tuple都發送到ID爲45的線程上,user_id模爲2的tuple都發送到ID爲39的線程上,user_id模爲0的tuple都發送到ID爲47的線程上。
自定義流式分組,自定義的Custom Grouping以下:
/** * 自定義的流式分組 * 模擬globalGrouping--->將全部的數據,傳遞到其中的一個task中 * 模擬fieldsGrouping(後面有時間本身能夠實現這一個) */ class MyCustomStreamingGrouping implements CustomStreamGrouping { private WorkerTopologyContext context; private GlobalStreamId stream; private List<Integer> targetTasks; /** * 相似自定義spout或bolt的初始化動做 * @param context * @param stream * @param targetTasks bolt對應的task的列表,若是咱們在bolt.setNum(3)--->targetTasks的大小就是3 */ @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { this.context = context; this.stream = stream; this.targetTasks = targetTasks; System.out.println("bolt對應的task列表: " + targetTasks); } /** * * @param taskId * @param values 就是tuple * @return */ @Override public List<Integer> chooseTasks(int taskId, List<Object> values) { if(targetTasks.size() < 1) { throw new RuntimeException("bolt的task個數竟然爲0,沒有任務執行做業"); } return Arrays.asList(targetTasks.get(0)); } }
其實這就是模擬Global Grouping的自定義流式分組,依然是計算總和的例子,其代碼以下:
package cn.xpleaf.bigdata.storm.group; import cn.xpleaf.bigdata.storm.utils.StormUtil; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.GlobalStreamId; import org.apache.storm.generated.StormTopology; import org.apache.storm.grouping.CustomStreamGrouping; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.task.WorkerTopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map; /** * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。 * <p> * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯 * MapReduce的組件:Mapper和Reducer、數據是Writable,經過一個main中的job將兩者關聯 * <p> * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些不必的方法進行了重寫,但其重寫的代碼沒有實現任何功能。 * 咱們稱這爲適配器模式 * * 流式分組之customGrouping,用戶自定義分組 */ public class CustomGroupingSumTopology { /** * 數據源 */ static class OrderSpout extends BaseRichSpout { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private SpoutOutputCollector collector; // 發送tuple的組件 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } /** * 接收數據的核心方法 */ @Override public void nextTuple() { long num = 0; while (true) { num++; long userId = num % 3; // 0 1 2 System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + ",用戶-->" + userId + "<--產生的訂單金額:" + num); this.collector.emit(new Values(userId, num)); StormUtil.sleep(1000); } } /** * 是對發送出去的數據的描述schema */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("user_id", "order_cost")); } } /** * 計算和的Bolt節點 */ static class SumBolt extends BaseRichBolt { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private OutputCollector collector; // 發送tuple的組件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private Long sumOrderCost = 0L; /** * 處理數據的核心方法 */ @Override public void execute(Tuple input) { Long userId = input.getLongByField("user_id"); Long orderCost = input.getLongByField("order_cost"); sumOrderCost += orderCost; System.out.println("線程ID:" + Thread.currentThread().getId() + " ,商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "用戶-->" + userId + "<--的商品總交易額" + sumOrderCost); StormUtil.sleep(1000); } /** * 若是當前bolt爲最後一個處理單元,該方法能夠不用管 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } /** * 構建拓撲,至關於在MapReduce中構建Job */ public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); /** * 設置spout和bolt的dag(有向無環圖) */ builder.setSpout("id_order_spout", new OrderSpout()); builder.setBolt("id_sum_bolt", new SumBolt(), 3) .customGrouping("id_order_spout", new MyCustomStreamingGrouping()); // 經過不一樣的數據流轉方式,來指定數據的上游組件 // 使用builder構建topology StormTopology topology = builder.createTopology(); String topologyName = CustomGroupingSumTopology.class.getSimpleName(); // 拓撲的名稱 Config config = new Config(); // Config()對象繼承自HashMap,但自己封裝了一些基本的配置 // 啓動topology,本地啓動使用LocalCluster,集羣啓動使用StormSubmitter if (args == null || args.length < 1) { // 沒有參數時使用本地模式,有參數時使用集羣模式 LocalCluster localCluster = new LocalCluster(); // 本地開發模式,建立的對象爲LocalCluster localCluster.submitTopology(topologyName, config, topology); } else { StormSubmitter.submitTopology(topologyName, config, topology); } } }
上傳到集羣並提交做業,其輸出結果以下:
2018-04-13 16:21:23.919 STDIO [INFO] 當前時間20180413162123,用戶-->1<--產生的訂單金額:1 2018-04-13 16:21:23.924 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413162123用戶-->1<--的商品總交易額1 2018-04-13 16:21:24.922 STDIO [INFO] 當前時間20180413162124,用戶-->2<--產生的訂單金額:2 2018-04-13 16:21:24.926 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413162124用戶-->2<--的商品總交易額3 2018-04-13 16:21:25.923 STDIO [INFO] 當前時間20180413162125,用戶-->0<--產生的訂單金額:3 2018-04-13 16:21:25.926 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413162125用戶-->0<--的商品總交易額6 2018-04-13 16:21:26.925 STDIO [INFO] 當前時間20180413162126,用戶-->1<--產生的訂單金額:4 2018-04-13 16:21:26.928 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413162126用戶-->1<--的商品總交易額10 2018-04-13 16:21:27.926 STDIO [INFO] 當前時間20180413162127,用戶-->2<--產生的訂單金額:5 2018-04-13 16:21:27.930 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413162127用戶-->2<--的商品總交易額15 2018-04-13 16:21:28.928 STDIO [INFO] 當前時間20180413162128,用戶-->0<--產生的訂單金額:6 2018-04-13 16:21:28.931 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413162128用戶-->0<--的商品總交易額21 2018-04-13 16:21:29.929 STDIO [INFO] 當前時間20180413162129,用戶-->1<--產生的訂單金額:7 2018-04-13 16:21:29.932 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413162129用戶-->1<--的商品總交易額28 2018-04-13 16:21:30.930 STDIO [INFO] 當前時間20180413162130,用戶-->2<--產生的訂單金額:8 2018-04-13 16:21:30.934 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413162130用戶-->2<--的商品總交易額36 2018-04-13 16:21:31.932 STDIO [INFO] 當前時間20180413162131,用戶-->0<--產生的訂單金額:9 ......
能夠看到只有一個executor接收到tuple數據,也就是說,經過使用自定義流式分組,確實實現了Global Grouping的功能。