下圖爲 Strom 的運行流程圖,在開發 Storm 流處理程序時,咱們須要採用內置或自定義實現 spout
(數據源) 和 bolt
(處理單元),並經過 TopologyBuilder
將它們之間進行關聯,造成 Topology
。html
IComponent
接口定義了 Topology 中全部組件 (spout/bolt) 的公共方法,自定義的 spout 或 bolt 必須直接或間接實現這個接口。java
public interface IComponent extends Serializable { /** * 聲明此拓撲的全部流的輸出模式。 * @param declarer 這用於聲明輸出流 id,輸出字段以及每一個輸出流是不是直接流(direct stream) */ void declareOutputFields(OutputFieldsDeclarer declarer); /** * 聲明此組件的配置。 * */ Map<String, Object> getComponentConfiguration(); }
自定義的 spout 須要實現 ISpout
接口,它定義了 spout 的全部可用方法:git
public interface ISpout extends Serializable { /** * 組件初始化時候被調用 * * @param conf ISpout 的配置 * @param context 應用上下文,能夠經過其獲取任務 ID 和組件 ID,輸入和輸出信息等。 * @param collector 用來發送 spout 中的 tuples,它是線程安全的,建議保存爲此 spout 對象的實例變量 */ void open(Map conf, TopologyContext context, SpoutOutputCollector collector); /** * ISpout 將要被關閉的時候調用。可是其不必定會被執行,若是在集羣環境中經過 kill -9 殺死進程時其就沒法被執行。 */ void close(); /** * 當 ISpout 從停用狀態激活時被調用 */ void activate(); /** * 當 ISpout 停用時候被調用 */ void deactivate(); /** * 這是一個核心方法,主要經過在此方法中調用 collector 將 tuples 發送給下一個接收器,這個方法必須是非阻塞的。 * nextTuple/ack/fail/是在同一個線程中執行的,因此不用考慮線程安全方面。當沒有 tuples 發出時應該讓 * nextTuple 休眠 (sleep) 一下,以避免浪費 CPU。 */ void nextTuple(); /** * 經過 msgId 進行 tuples 處理成功的確認,被確認後的 tuples 不會再次被髮送 */ void ack(Object msgId); /** * 經過 msgId 進行 tuples 處理失敗的確認,被確認後的 tuples 會再次被髮送進行處理 */ void fail(Object msgId); }
一般狀況下,咱們實現自定義的 Spout 時不會直接去實現 ISpout
接口,而是繼承 BaseRichSpout
。BaseRichSpout
繼承自 BaseCompont
,同時實現了 IRichSpout
接口。github
IRichSpout
接口繼承自 ISpout
和 IComponent
,自身並無定義任何方法:shell
public interface IRichSpout extends ISpout, IComponent { }
BaseComponent
抽象類空實現了 IComponent
中 getComponentConfiguration
方法:apache
public abstract class BaseComponent implements IComponent { @Override public Map<String, Object> getComponentConfiguration() { return null; } }
BaseRichSpout
繼承自 BaseCompont
類並實現了 IRichSpout
接口,而且空實現了其中部分方法:安全
public abstract class BaseRichSpout extends BaseComponent implements IRichSpout { @Override public void close() {} @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} }
經過這樣的設計,咱們在繼承 BaseRichSpout
實現自定義 spout 時,就只有三個方法必須實現:服務器
SpoutOutputCollector
;bolt 接口的設計與 spout 的相似:app
/** * 在客戶端計算機上建立的 IBolt 對象。會被被序列化到 topology 中(使用 Java 序列化),並提交給集羣的主機(Nimbus)。 * Nimbus 啓動 workers 反序列化對象,調用 prepare,而後開始處理 tuples。 */ public interface IBolt extends Serializable { /** * 組件初始化時候被調用 * * @param conf storm 中定義的此 bolt 的配置 * @param context 應用上下文,能夠經過其獲取任務 ID 和組件 ID,輸入和輸出信息等。 * @param collector 用來發送 spout 中的 tuples,它是線程安全的,建議保存爲此 spout 對象的實例變量 */ void prepare(Map stormConf, TopologyContext context, OutputCollector collector); /** * 處理單個 tuple 輸入。 * * @param Tuple 對象包含關於它的元數據(如來自哪一個組件/流/任務) */ void execute(Tuple input); /** * IBolt 將要被關閉的時候調用。可是其不必定會被執行,若是在集羣環境中經過 kill -9 殺死進程時其就沒法被執行。 */ void cleanup();
一樣的,在實現自定義 bolt 時,一般是繼承 BaseRichBolt
抽象類來實現。BaseRichBolt
繼承自 BaseComponent
抽象類並實現了 IRichBolt
接口。dom
IRichBolt
接口繼承自 IBolt
和 IComponent
,自身並無定義任何方法:
public interface IRichBolt extends IBolt, IComponent { }
經過這樣的設計,在繼承 BaseRichBolt
實現自定義 bolt 時,就只須要實現三個必須的方法:
OutputCollector
;這裏咱們使用自定義的 DataSourceSpout
產生詞頻數據,而後使用自定義的 SplitBolt
和 CountBolt
來進行詞頻統計。
案例源碼下載地址:storm-word-count
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.2.2</version> </dependency>
public class DataSourceSpout extends BaseRichSpout { private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); private SpoutOutputCollector spoutOutputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; } @Override public void nextTuple() { // 模擬產生數據 String lineData = productData(); spoutOutputCollector.emit(new Values(lineData)); Utils.sleep(1000); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("line")); } /** * 模擬數據 */ private String productData() { Collections.shuffle(list); Random random = new Random(); int endIndex = random.nextInt(list.size()) % (list.size()) + 1; return StringUtils.join(list.toArray(), "\t", 0, endIndex); } }
上面類使用 productData
方法來產生模擬數據,產生數據的格式以下:
Spark HBase Hive Flink Storm Hadoop HBase Spark Flink HBase Storm HBase Hadoop Hive Flink HBase Flink Hive Storm Hive Flink Hadoop HBase Hive Hadoop Spark HBase Storm
public class SplitBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector=collector; } @Override public void execute(Tuple input) { String line = input.getStringByField("line"); String[] words = line.split("\t"); for (String word : words) { collector.emit(new Values(word)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
public class CountBolt extends BaseRichBolt { private Map<String, Integer> counts = new HashMap<>(); @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } @Override public void execute(Tuple input) { String word = input.getStringByField("word"); Integer count = counts.get(word); if (count == null) { count = 0; } count++; counts.put(word, count); // 輸出 System.out.print("當前實時統計結果:"); counts.forEach((key, value) -> System.out.print(key + ":" + value + "; ")); System.out.println(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
經過 TopologyBuilder 將上面定義好的組件進行串聯造成 Topology,並提交到本地集羣(LocalCluster)運行。一般在開發中,可先用本地模式進行測試,測試完成後再提交到服務器集羣運行。
public class LocalWordCountApp { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("DataSourceSpout", new DataSourceSpout()); // 指明將 DataSourceSpout 的數據發送到 SplitBolt 中處理 builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout"); // 指明將 SplitBolt 的數據發送到 CountBolt 中 處理 builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt"); // 建立本地集羣用於測試 這種模式不須要本機安裝 storm,直接運行該 Main 方法便可 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalWordCountApp", new Config(), builder.createTopology()); } }
啓動 WordCountApp
的 main 方法便可運行,採用本地模式 Storm 會自動在本地搭建一個集羣,因此啓動的過程會稍慢一點,啓動成功後便可看到輸出日誌。
提交到服務器的代碼和本地代碼略有不一樣,提交到服務器集羣時須要使用 StormSubmitter
進行提交。主要代碼以下:
爲告終構清晰,這裏新建 ClusterWordCountApp 類來演示集羣模式的提交。實際開發中能夠將兩種模式的代碼寫在同一個類中,經過外部傳參來決定啓動何種模式。
public class ClusterWordCountApp { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("DataSourceSpout", new DataSourceSpout()); // 指明將 DataSourceSpout 的數據發送到 SplitBolt 中處理 builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout"); // 指明將 SplitBolt 的數據發送到 CountBolt 中 處理 builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt"); // 使用 StormSubmitter 提交 Topology 到服務器集羣 try { StormSubmitter.submitTopology("ClusterWordCountApp", new Config(), builder.createTopology()); } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); } } }
打包後上傳到服務器任意位置,這裏我打包後的名稱爲 storm-word-count-1.0.jar
# mvn clean package -Dmaven.test.skip=true
使用如下命令提交 Topology 到集羣:
# 命令格式: storm jar jar包位置 主類的全路徑 ...可選傳參 storm jar /usr/appjar/storm-word-count-1.0.jar com.heibaiying.wordcount.ClusterWordCountApp
出現 successfully
則表明提交成功:
# 查看全部Topology storm list # 中止 storm kill topology-name [-w wait-time-secs] storm kill ClusterWordCountApp -w 3
使用 UI 界面一樣也可進行中止操做,進入 WEB UI 界面(8080 端口),在 Topology Summary
中點擊對應 Topology 便可進入詳情頁面進行操做。
在上面的步驟中,咱們沒有在 POM 中配置任何插件,就直接使用 mvn package
進行項目打包,這對於沒有使用外部依賴包的項目是可行的。但若是項目中使用了第三方 JAR 包,就會出現問題,由於 package
打包後的 JAR 中是不含有依賴包的,若是此時你提交到服務器上運行,就會出現找不到第三方依賴的異常。
這時候可能你們會有疑惑,在咱們的項目中不是使用了 storm-core
這個依賴嗎?其實上面之因此咱們能運行成功,是由於在 Storm 的集羣環境中提供了這個 JAR 包,在安裝目錄的 lib 目錄下:
爲了說明這個問題我在 Maven 中引入了一個第三方的 JAR 包,並修改產生數據的方法:
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.8.1</version> </dependency>
StringUtils.join()
這個方法在 commons.lang3
和 storm-core
中都有,原來的代碼無需任何更改,只須要在 import
時指明使用 commons.lang3
。
import org.apache.commons.lang3.StringUtils; private String productData() { Collections.shuffle(list); Random random = new Random(); int endIndex = random.nextInt(list.size()) % (list.size()) + 1; return StringUtils.join(list.toArray(), "\t", 0, endIndex); }
此時直接使用 mvn clean package
打包運行,就會拋出下圖的異常。所以這種直接打包的方式並不適用於實際的開發,由於實際開發中一般都是須要第三方的 JAR 包。
想把依賴包一併打入最後的 JAR 中,maven 提供了兩個插件來實現,分別是 maven-assembly-plugin
和 maven-shade-plugin
。鑑於本篇文章篇幅已經比較長,且關於 Storm 打包還有不少須要說明的地方,因此關於 Storm 的打包方式單獨整理至下一篇文章:
更多大數據系列文章能夠參見 GitHub 開源項目: 大數據入門指南