Storm是一個開源的分佈式實時計算系統,能夠簡單、可靠的處理大量的數據流。java
Storm有不少使用場景:如實時分析,在線機器學習,持續計算,分佈式RPC,ETL等等。node
Storm支持水平擴展,具備高容錯性,保證每一個消息都會獲得處理,並且處理速度很快(在一個小集羣中,每一個結點每秒能夠處理數以百萬計的消息)。web
Storm的部署和運維都很便捷,並且更爲重要的是可使用任意編程語言來開發應用。數據庫
storm結構稱爲topology(拓撲),由stream(數據流)、spout(數據流的生成者)、bolt(數據流運算者)組成。編程
下圖爲官網提供的模型:json
不一樣於Hadoop中的job,Storm中的topology會一直運行下去,除非進程被殺死或取消部署。數組
Storm的核心數據結構是tuple(元組),本質上是包含了一個或多個鍵值對的列表。Stream是由無限個的tuple組成的序列。服務器
spout鏈接數據源,將數據轉化爲tuple,並將tuple做爲數據流進行發射。開發一個spout的主要工做就是利用API編寫代碼從數據源消費數據流。網絡
spout的數據源能夠有不少種來源:數據結構
web或者移動程序的點擊流、社交網絡的信息、傳感器收集到的數據、應用程序產生的日誌信息。
spout一般只負責轉換數據、發射數據,不會用於處理業務邏輯,防止與業務產生耦合,從而能夠很方便的實現spout的複用。
bolt主要負責數據的運算。將接收到的數據實施運算後,選擇性的輸出一個或多個數據流。
一個bolt能夠接收多個由spout或其餘bolt發射的數據流,從而能夠組建出複雜的數據轉換和處理的網絡拓撲結構。
bolt常見的典型功能:
過濾、鏈接、聚合、計算和數據庫的讀寫。
案例:Word Count案例
語句Spout-->語句分隔Bolt-->單詞計數Bolt-->上報Bolt。
SentenceSpout
做爲入門案例,能夠直接從一個數組中不斷讀取語句,做爲數據來源。
SentenceSpout不斷讀取語句將其做爲數據來源,組裝成單值tuple(鍵名sentence,鍵值爲字符串格式的語句)向後發射。
{"sentence":"i am so shuai!"}
代碼:
/** * BaseRichSpout類是ISpout接口和IComponent接口的一個簡便的實現。採用了適配器模式,對用不到的方法提供了默認實現。 */ public class SentenceSpout extends BaseRichSpout { private SpoutOutputCollector collector = null; private String[] sentencer = { "i am so shuai", "do you look me", "you can see me", "i am so shuai", "do you bilive" }; private int index = 0; /** * ISpout接口中定義的方法 全部Spout組件在初始化時都會調用這個方法。 map 包含了Storm配置信息 context * 提供了topology中的組件信息 collector 提供了發射tuple的方法 */ @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } /** * 覆蓋自BaseRichSpout中的方法 核心方法 Storm經過調用此方法向發射tuple */ @Override public void nextTuple() { this.collector.emit(new Values(sentencer[index])); index = (index + 1 >= sentencer.length ? 0 : index + 1); Utils.sleep(1000); } /** * IComponent接口中定義的方法 全部的Storm組件(spout、bolt)都要實現此接口。 * 此方法告訴Storm當前組件會發射哪些數據流,每一個數據流中的tuple中包含哪些字段。 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } }
SplitSenetenceBolt
語句分隔Bolt訂閱SentenceSpout發射的tuple,每接收到一個tuple就獲取"sentence"對應的值,而後將獲得的語句按照空格切分爲一個個單詞。而後將每一個單詞向後發射一個tuple。
{"word":"I"} {"word":"am"} {"word":"so"} {"word":"shuai"}
代碼:
/** * BaseRichBolt 是IComponent 和 IBolt接口的一個簡單實現。採用了適配器模式,對用不到的方法提供了默認實現。 */ public class SplitSenetenceBolt extends BaseRichBolt { private OutputCollector collector = null; /** * 定義在IBolt中的方法 在bolt初始化時調用,用來初始化bolt stormConf 包含了Storm配置信息 context * 提供了topology中的組件信息 collector 提供了發射tuple的方法 */ @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } /** * 覆蓋自BaseRichBolt中的方法 核心方法 Storm經過調用此方法向發射tuple */ @Override public void execute(Tuple input) { String centence = input.getStringByField("sentence"); String[] words = centence.split(" "); for (String word : words) { collector.emit(new Values(word)); } } /** * IComponent接口中定義的方法 全部的Storm組件(spout、bolt)都要實現此接口。 * 此方法告訴Storm當前組件會發射哪些數據流,每一個數據流中的tuple中包含哪些字段。 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
WordCountBolt
單詞計數Bolt訂閱SplitSentenceBolt的輸出,保存每一個特定單詞出現次數,當接收到一個新的tuple,會將對應單詞計數加一,並向後發送該單詞的當前計數。
{"word":"I","count":3}
代碼:
public class WordCountBolt extends BaseRichBolt { private OutputCollector collector = null; private Map<String, Integer> counts = null; /** * 注意: 全部的序列化操做最好都在prepare方法中進行 緣由: * Storm在工做時會將全部的bolt和spout組件先進行序列化,而後發送到集羣中, * 若是在序列化以前建立過任何沒法序列化的對象都會形成序列化時拋出NotSerializableException。 * 此處的HashMap自己是能夠序列化的因此不會有這個問題,可是有必要養成這樣的習慣 。 */ @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.counts = new HashMap<>(); } @Override public void execute(Tuple input) { String word = input.getStringByField("word"); counts.put(word, counts.containsKey(word) ? counts.get(word) + 1 : 1); this.collector.emit(new Values(word, counts.get(word))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
ReportBolt
上報Bolt訂閱WordCountBolt類的輸出,內部維護一份全部單詞的對應計數的表,當接收到一個tuple時,上報Bolt會更新表中的計數數據,並將值打印到終端。
/** * 此Bolt處於數據流的末端,因此只接受tuple而不發射任何數據流。 */ public class ReportBolt extends BaseRichBolt { private Map<String, Integer> counts = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.counts = new HashMap<>(); } @Override public void execute(Tuple input) { String word = input.getStringByField("word"); Integer count = input.getIntegerByField("count"); counts.put(word, count); } /** * Storm會在終止一個Bolt以前調用此方法。 * 此方法一般用來在Bolt退出以前釋放資源。 * 此處咱們用來輸出統計結果到控制檯。 * 注意: * 真正集羣環境下,cleanup()方法是不可靠的,不能保證必定執行,後續會討論。 */ @Override public void cleanup() { List<String> keys = new ArrayList<>(); keys.addAll(counts.keySet()); Collections.sort(keys); for (String key : keys) { Integer count = counts.get(key); System.err.println("--" + key + "發生了變化----數量爲" + count + "--"); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 處於流末端的tuple,沒有任何輸出數據流,因此此方法爲空 } }
經過main方法組裝處理流程。此處咱們使用單機模式來測試。
public class WCDirver { public static void main(String[] args) throws Exception { // --實例化Spout和Bolt SentenceSpout sentenceSpout = new SentenceSpout(); SplitSenetenceBolt splitSenetenceBolt = new SplitSenetenceBolt(); WordCountBolt wordCountBolt = new WordCountBolt(); ReportBolt reportBolt = new ReportBolt(); // --建立TopologyBuilder類實例 TopologyBuilder builder = new TopologyBuilder(); // --註冊SentenceSpout builder.setSpout("sentence_spout", sentenceSpout); // --註冊SplitSentenceBolt,訂閱SentenceSpout發送的tuple. // 此處使用了shuffleGrouping方法, // 此方法指定全部的tuple隨機均勻的分發給SplitSentenceBolt的實例。 builder.setBolt("split_senetence_bolt", splitSenetenceBolt).shuffleGrouping("sentence_spout"); // --註冊WordCountBolt,,訂閱SplitSentenceBolt發送的tuple // 此處使用了filedsGrouping方法, // 此方法能夠將指定名稱的tuple路由到同一個WordCountBolt實例中 builder.setBolt("word_count_bolt", wordCountBolt).fieldsGrouping("split_senetence_bolt", new Fields("word")); // --註冊ReprotBolt,訂閱WordCountBolt發送的tuple // 此處使用了globalGrouping方法,表示全部的tuple都路由到惟一的ReprotBolt實例中 builder.setBolt("report_bolt", reportBolt).globalGrouping("word_count_bolt"); //--建立配置對象 Config config = new Config(); //--建立表明集羣的對象,LocalCluster表示在本地開發環境來模擬一個完整的Storm集羣 //本地模式是開發和測試的簡單方式,省去了在分佈式集羣中反覆部署的開銷 //另外能夠執行斷點調試很是的便捷 LocalCluster cluster = new LocalCluster(); //--提交Topology給集羣運行 cluster.submitTopology("Wc_Topology", config, builder.createTopology()); //--運行10秒鐘後殺死Topology關閉集羣 Thread.sleep(10 * 1000); cluster.killTopology("Wc_Topology"); cluster.shutdown(); } }
Storm集羣中的topology在以下的四個級別中存在併發:
服務器:配置在Storm集羣中的一個服務器,會執行Topology的一部分運算,一個Storm集羣中包含一個或者多個Node。
JVM虛擬機、進程:指一個Node上相互獨立運做的JVM進程,每一個Node能夠配置運行一個或多個worker。一個Topology會分配到一個或者多個worker上運行。
線程:指一個worker的jvm中運行的java線程。多個task能夠指派給同一個executer來執行。除非是明確指定,Storm默認會給每一個executor分配一個task。
bolt/spout實例:task是spout和bolt的實例,他們的nextTuple()和execute()方法會被executors線程調用執行。
大多數狀況下,除非明確指定,Storm的默認併發設置值是1。即,一臺服務器(node),爲topology分配一個worker,每一個executer執行一個task。
如圖:Storm默認併發機制。
此時惟一的併發機制出如今線程級即Executor。
這個其實就是增長集羣的服務器數量。
能夠經過API和修改配置兩種方式修改分配給topology的woker數量。
API增長woker:
Config config = new Config(); config.setNumWorkers(2);
單機模式下,增長worker的數量不會有任何提高速度的效果。
API增長Executor:
builder.setSpout(spout_id,spout,2); builder.setBolt(bolt_id,bolt,executor_num);
這種辦法爲Spout或Bolt增長線程數量,默認每一個線程都運行該Spout或Bolt的一個task。
API增長Task:
builder.setSpout(...).setNumTasks(2); builder.setBolt(...).setNumTasks(task_num);
若是手動設置過task的數量,task的總數量就是指定的數量個,而無論線程有幾個,這些task會隨機分配在這些個線程內部執行。
數據流分組方式定義了數據如何進行分發。
Storm內置了七種數據流分組方式:
隨機分組。
隨機分發數據流中的tuple給bolt中的各個task,每一個task接收到的tuple數量相同。
按字段分組。
根據指定字段的值進行分組。指定字段具備相同值的tuple會路由到同一個bolt中的task中。
全複製分組。
全部的tuple複製後分發給後續bolt的全部的task。
全局分組。
這種分組方式將全部的tuple路由到惟一一個task上,Storm按照最小task id來選取接受數據的task。這種分組方式下配置bolt和task的併發度沒有意義。
這種方式會致使全部tuple都發送到一個JVM實例上,可能會引發Strom集羣中某個JVM或者服務器出現性能瓶頸或崩潰。
不分組。
在功能上和隨機分組相同,爲未來預留。
指向型分組。
數據源會經過emitDirect()方法來判斷一個tuple應該由哪一個Strom組件來接受。只能在聲明瞭是指向型數據流上使用。
本地或隨機分組。
和隨機分組相似,可是,會將tuple分發給同一個worker內的bolt task,其餘狀況下采用隨機分組方式。這種方式能夠減小網絡傳輸,從而提升topology的性能。
另外能夠自定義數據流分組方式
寫類實現CustomStreamGrouping接口
代碼:
/** * 自定義數據流分組方式 * @author park * */ public class MyStreamGrouping implements CustomStreamGrouping { /** * 運行時調用,用來初始化分組信息 * context:topology上下文對象 * stream:待分組數據流屬性 * targetTasks:全部待選task的標識符列表 * */ @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { } /** * 核心方法,進行task選擇 * taskId:發送tuple的組件id * values:tuple的值 * 返回值要發往哪一個task */ @Override public List<Integer> chooseTasks(int taskId, List<Object> values) { return null; } }
Storm集羣遵循主/從結構。Storm的主節點是半容錯的。
Strom集羣由一個主節點(nimbus)和一個或者多個工做節點(supervisor)組成。
除此以外Storm集羣還須要一個ZooKeeper的來進行集羣協調。
nimbus守護進程主要的責任是管理、協調和監控在集羣上運行的topology。
包括topology的發佈,任務的指派,事件處理失敗時從新指派任務。
將topology發佈到Storm集羣。
將預先打包成jar文件的topology和配置信息提交到nimbus服務器上,一旦nimbus接收到topology的壓縮包,會將jar包分發到足夠數量的supervisor節點上,當supervisor節點接收到了topology壓縮文件,nimbus就會指派task到每一個supervisor而且發送信號指示supervisor生成足夠的worker來執行指派的task。
nimbus記錄全部的supervisor節點的狀態和分配給他們的task,若是nimbus發現某個supervisor沒有上報心跳或者已經不可達了,他將會將故障supervisor分配的task從新分配到集羣中的其餘supervisor節點。
nimbus並不參與topology的數據處理過程,只是負責topology的初始化、任務分發和進程監控。所以,即便nimbus守護進程在topology運行時中止了,只要分配的supervisor和worker健康運行,topology會一直繼續處理數據,因此稱之爲半容錯機制。
supervisor守護進程等待nimbus分配任務後生成並監控workers執行任務。
supervisor和worker都是運行在不一樣的JVM進程上,若是supervisor啓動的worker進程由於錯誤異常退出,supervisor將會嘗試從新生成新的worker進程。