Storm做爲當前最流行的實時計算框架,自Twitter將其開源後就一直備受關注。因爲其具備先天的穩定性以及便捷性,目前被許多大公司所採用,國外像雅虎、雅虎日本、Twitter、OOYALA、Spotify,國內像京東、騰訊、阿里等都使用Storm來完成大量實時計算來爲用戶提供優質服務。目前官方的最新發布版本是0.10.0。html
本文將對官網的Storm手冊進行翻譯,因爲本人英語能力有限,翻譯不免有些不妥之處,望你們指正。點擊此處閱讀官網英文原版。java
-------------------------------------------------------------------------------------------------------------git
幫助手冊 github
在本手冊中,你將瞭解如何建立Storm的Topology和如何將它們部署到Storm的集羣中。本文主要使用java做爲演示語言,可是爲了說明Storm支持多語言編程,一些示例將用Python編寫。
數據庫
前言apache
本手冊使用storm-starter項目中的例子。建議在你的項目中複製這些示例而且在這個示例的基礎上修改來知足你的需求。閱讀設置Storm開發環境和建立Storm項目兩篇文章來設置你的機器。編程
Storm集羣api
一個Storm集羣和一個Hadoop集羣很是像。不一樣的是Handoop上運行的是"MapReduce"任務,而在Storm運行的是"Topology"拓撲圖(比較抽象,這裏暫且翻譯成拓撲圖)。"MapReduce"和"Topology"是很不同的 —— 一個主要的不一樣點是:"MapReduce"最終會完成,而Topology會一直執行下去(除非你手動將其kill掉)。數組
在Storm集羣中有兩種節點:master節點和worker節點。master節點運行一個叫"Nimbus"的守護進行,他很是像Hadoop中的"JobTracker"。Nimbus負責向集羣分發代碼,分配任務,而且監測主機是否出現故障。網絡
每個worker節點運行一個叫"Supervisor"的守護進程。它監聽它所在機器上已經分配的任務,而且在必要時啓動或者中止Nimbus已經分配給它的任務。每一個worker進程執行一個Topology的子集;一個Topology由許多worker組成。
全部Nimbus和Supervisor之間的協調或者調度工做都由Zookeeper集羣來完成。此外,Nimbus和Supervisor守護進程被設計成快速失敗和無狀態的;全部的狀態被保存在Zookeeper集羣或者本地磁盤上。這就意味着你可使用"kill -9"來終止Nimbus和Supervisor,就像什麼也沒發生同樣,Storm會繼續正常運行。這種設計使得Storm集羣很是穩定。下面是Storm的大致結構圖:
Topology(拓撲圖)
要在Storm上進行實時計算,你須要建立Topology。一個Topology就是一個圖的計算。Topology中的每一個節點都包含了邏輯處理,而且指明瞭節點之間如何傳遞數據。
運行一個topology是很是簡單的。首先,你須要將你的代碼和你代碼所依賴的jar打成一個包。而後,運行下面的命令
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
上面運行了backtype.storm.MyTopology這個類,而且附帶兩個參數。這個類的主要功能是定義Topology而且將其提交到Nimbus,storm jar鏈接Nimbus並上傳jar。
因爲Topology被定義成Thrifts結構,並且Nimbus是一個Thrift服務,你能夠用任何編程語言建立與提交Topology。上面的例子是最簡單的而且是基於jvm語言的。閱讀集羣上運行Topology一文來獲取更多的關於啓動和中止Topology的信息。
Stream(流)
Stream是Storm的核心抽象概念,Stream是一個無界的元組序列。Storm經過一個分佈式而且可靠的方式將原始流轉換到新流。例如,你可能將推文流轉換成一個熱門話題。
Storm中完成流轉的角色分別是"Spout"和"Bolt"。在你應用程序中,你須要實現"Spout"和"Bolt"提供的接口來完成業務邏輯。
Spout是Stream的來源。例如,Spout可能從Kestrel隊列讀取數據而且以流的形式將他們發射出去,或者Spout經過Twitter API獲取數據來發射一個推文流。
Bolt消費任意數量的輸入流,進行一些處理,而後以新流的形式發射它們。複雜的流轉,好比經過計算將推文轉換成熱門話題,須要許多步驟,所以須要許多blot。Bolt能夠作不少事情,過濾元組、流聚合、流鏈接、和數據庫對話等等。
一個Spout和Bolt的網絡被打包成一個Topology,它是你提交到Storm集羣執行的最高層次的抽象。一個Topology是一個流轉圖,它的每一個節點是一個Spout或者是一個Bolt。圖中的邊線表示Bolt訂閱了哪些流。當Spout或者Bolt向流發射元組,它將向全部訂閱它的Bolt發射這些流。下圖展現了一個Topology模型:
Topology中節點之間的連接代表了元組如何被傳遞。例如,Spout A和Bolt B之間有一個連接,Spout A和Bolt C之間有一個連接,Bolt和Bolt C之間有一個連接。那麼每次Spout A發射一個元組,它都會將此元組發射到Bolt B和Bolt C。一樣全部Bolt B的輸出也會發射到Bolt C。
Topology中的每一個節點都是並行執行的。你能夠指定每一個節點有多少個線程並行執行,而後Storm會產生相應數量的線程交給集羣去執行。
Topology會永遠執行下去,知道你kill掉它。Storm會自動從新分配失敗的任務。此外,即使機器故障或者消息丟失,Storm也會保證數據不會丟失。
Data model(數據模型)
Storm使用元組做爲它的數據模型。一個元組可使一個list,一個Object或者任何類型。此外,Storm支持全部的原始類型,S字符串、字節數組等做爲它的元祖值。若是你想使用其餘類型的對象,只需實現一個serializer接口便可。
Topology中的每一個節點都必須聲明一個它要發設的元組的類型。例如,下面的Bolt發射了"double"和"triple"兩個元組
public class DoubleAndTripleBolt extends BaseRichBolt { private OutputCollectorBase _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) { _collector = collector; } @Override public void execute(Tuple input) { int val = input.getInteger(0); _collector.emit(input, new Values(val*2, val*3)); _collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("double", "triple")); } }
declareOutputFields方法爲組件聲明瞭["double","triple"]兩個輸出域。Bolt的詳細功能將在後面章節中闡述。
A simple topology(Topology簡單示例)
光說不練假把式,讓咱們來經過storm-starter中的ExclamationTopology實例來講明一個Topology如何編寫。
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word", new TestWordSpout(), 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word"); builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
上面的Topology示例包括一個Spout和兩個Blot。Spout發射word,而後每一個Bolt在其後追加一個"!!!"字符串。節點間的流轉流程是:Spout首先發射到第一個Bolt,而後第一個Bolt再發射到第二個Bolt。若是Spout發射出的元組爲["bob"]和["john"],那麼通過第二個Bolt後將發射["bob!!!!!!"]和["john!!!!!!"]。
代碼中指定節點用setSpout和setBolt方法。這兩個方法都指定了三個入參,他們分別是:用戶本身指定的id、用戶的業務處理邏輯類和指定的並行運行的數量。在上面的例子中,Spout指定id爲"words",Bolt分別指定了"exclaim1"和"exclaim2"做爲id。
邏輯處理類分別實現了IRichSpout(Spout處理類實現)接口和IRichBolt(Bolt處理類實現)接口。
最後一個參數指定了節點並行運行的數量,是可選的。它表示集羣中有多少個線程執行處理邏輯。若是不指定該參數,Storm默認會分配一個線程來執行。
setBolt返回一個InputDeclarer對象。組件"exclaim1"使用shuffleGrouping來代表它會接收"words"組件發射出來的全部元組,一樣組件"exclaim2"使用shuffleGrouping來代表它會接收"exclaim1"發射出來的全部元組。"shuffleGrouping"表示元組會隨機的從輸入任務分發到Bolt任務。組件與組件之間有不少種方法實現數據分組,後面的章節將會詳細闡述。
若是你想讓組件"exclaim2"既讀"words"組件發射出的元組又讀"exclaim1"發射出的元組,你能夠像下面這樣寫:
builder.setBolt("exclaim2", new ExclamationBolt(), 5) .shuffleGrouping("words") .shuffleGrouping("exclaim1");
正如你上面看到的,能夠鏈式的爲Bolt指定多個輸入源。
咱們來詳細看下這個Topology中的Spout和Bolt的實現。Spout向Topology中發射新消息。這個Topology中的TestWordSpout每隔100ms就會從從列表["nathan", "mike", "jackson", "golda", "bertels"]中隨機取出一個字符串發射出去。TestWordSpout中的nextTuple()方法實現以下:
public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); }
如你所見,實現是很是簡單的。
ExclamationBolt向輸入源追加"!!!"字符串。咱們來看下ExclamationBolt的整個實現:
public static class ExclamationBolt extends BaseRichBolt { OutputCollector _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } @Override public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
prepare方法爲Bolt提供了一個OutputCollector用來發射元組。元組能夠在任什麼時候候被髮射出去--prepare、execute或者cleanup方法均可以,設置能夠在另一個異步線程中發射。在上面的示例中,prepare方法只是簡單的保存了OutputCollector實例以備在後面的execute方法中使用。
execute方法從多個Bolt輸入源中接收一個元組。ExckamationBolt獲取元組的第一個字段,而且追加"!!!"字符串後將其發射出去。若是你要實現一個訂閱了多個輸入源的Bolt,可使用Tuple的getSourceComponent方法即可以知道這個元組來自哪一個組件。
execute中還能夠作一些其餘的事情,即將元組做爲emit的第一個參數,而且在最後一行發送確認信息(ack)。這些是Storm API的一部分,以此來保證數據不丟失,後面章節會詳解。
當Bolt中止運行而且須要清理咱們打開的資源的時候,cleanup方法會被調用。可是,不保證集羣會執行此方法:例如,若是運行任務的機器掛掉,那麼沒有任何辦法來調用cleanup。僅在本地模式(在一個進程中模擬Storm集羣)運行而且想kill掉Topology來節省資源時,建議你使用此方法。
declareOutputFields方法定義了ExclamationBolt發射的元組名爲"word"。
getComponentConfiguration方法容許你指定組件的運行條件。後續章節會有Configuration的闡述。
cleanup和getComponentConfiguration兩個方法在Bolt的實現中不是常常被用到。你可使用提供了默認實現的基本類來簡單的定義Bolt。若是實現BaseRichBolt類ExclamationBolt會更簡潔,以下:
public static class ExclamationBolt extends BaseRichBolt { OutputCollector _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } @Override public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
本地模式運行ExclamationTopology
本小節咱們來看一下如何在本地模式運行ExclamationTopology。
Storm有兩種運行模式:本地模式和集羣模式。在本地模式下,Storm用線程模擬worker節點。本地模式對開發和測試Topology來講是很是有幫助的。當你運行storm-starter項目中的Topology示例的時候,它們在本地模式運行,而且你能夠看見每一個組件發射的信息。點擊Local mode閱讀更多的在本地模式下運行Topology的信息。
在分佈式模式下,Storm運行在機器集羣上。當你向master提交一個Topology的同時,你也須要提交此Topology的代碼。master會分發你的代碼而且分配worker運行你提交的Topology。若是worker掛掉,master會從新分配其餘的worker。
點擊在集羣上運行Topology一文來閱讀更多的關於如何在集羣模式下運行Topology的信息。
下面是在本地模式下運行ExclamationTopology的代碼:
Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown();
首先,代碼經過建立LocalCluster對象來指定僞集羣。向僞集羣提交Topology和向分佈式集羣提交Topology同樣。調用submitTopology方法向LocalCluster提交Topology,第一個參數指定Topology的名稱;第二個參數指定配置;第三個參數指定要提交的Topology。
名稱用於辨別Topology以便你在後面kill掉它。一個Topology會一直運行下去直到你kill掉它。
configuration被用來設置Topology運行的各類參數。下面提到的兩個參數會常常用到:
一、TOPOLOGY_WORKERS(使用setNumWorkers方法設置)參數指定集羣非配多少個進程來運行Topology。每一個Topology中的組件都會以多線程的方式運行。分配給組件的線程數量經過setBolt和setSpout的方法設置。這些線程存在於工做進程中。每一個工做進程包括了若干組件的若干線程。例如,在你的組件中指定了300個線程,在配置中指定了50個工做進程,那麼每一個工做進程將執行6個線程,每一個線程都有可能屬於不一樣的組件。你能夠經過調整Topology中各個組件的並行性,和工做進程的線程數來調優Storm。
二、TOPOLOGY_DEBUG(使用setDebug來設置),當它設置爲true的時候,就告訴Storm記錄每一個組件的每一個信息。這在本地測試Topology時很是有用,然而當運行在生產環境的集羣中時,你可能但願關閉日誌。
你能夠設置不少Topology的參數,點擊the Javadoc for Config來查看詳細信息。
若是想了解若是設置開發環境來在本地模式下運行Topology(例如Eclipse),查看Creating a new Storm project。
Stream groupings(流分組)
一個流分組闡述了Topology如何在兩個組件間發送元組。記住,Spout和Bolt以任務的形式在集羣中並行執行,像下面圖中展現的那樣:
當Bolt A的任務發送給Bolt B,那麼Bolt的任務將會發給誰呢?
"stream grouping"經過告訴Storm如何在任務集合間發送元組回答了這個問題。在咱們深刻了解各類stream grouping錢,咱們先看一下storm-starter項目中的另外一個Topology。WordCountTopology經過Spout讀取句子,而後WordCountBolt再每一個單詞輸出以前輸出它的次數:
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentences", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8) .shuffleGrouping("sentences"); builder.setBolt("count", new WordCount(), 12) .fieldsGrouping("split", new Fields("word"));
SplitSentence發送它收到的每一個句子的每一個單詞的元組,WordCount在內存中保存單詞和次數的Map映射。每次WordCount接收到一個單詞,它都會更新單詞的數量。
Storm有一些不一樣的流分組。其中最簡單的分組叫作"shuffle grouping",它會發射元組到一個隨機的任務。WordCountTopology中使用了"shuffle grouping" 來從RandomSentenceSpout向SplitSentence發射元組。元組將均勻的分發到全部SplitSentence的任務中。
一個更有趣的分組叫作"fields grouping"。"fields grouping"被用於SplitSentence和WordCount之間。它有一個關鍵的做用是保證相同的單詞老是被分發到同一個任務中。不然,不止一個任務會獲得相同的單詞,它們將發射一個錯誤的值,由於它們獲得的是不完整的信息。"fields grouping"容許你經過字段的子集進行分組。這會致使和子集相等的值會去到同一個任務。因爲WordCount經過在字段"word"使用"fields grouping" 來訂閱SplitSentence的輸出流,因此相同的單詞會由同一個任務執行而且Bolt生產出正確的輸出。
"fields grouping"是實現流鏈接、流聚合以及其餘更多的狀況的基礎。"fields grouping"是經過取模哈希來實現的。
還有一些其餘的stream grouping,你能夠閱讀更多關於stream grouping的概念。