在運行一個Storm任務以前,須要瞭解一些概念: node
Storm集羣和Hadoop集羣表面上看很相似。可是Hadoop上運行的是MapReduce jobs,而在Storm上運行的是拓撲(topology),這二者之間是很是不同的。一個關鍵的區別是: 一個MapReduce job最終會結束, 而一個topology永遠會運行(除非你手動kill掉)。 數據庫
在Storm的集羣裏面有兩種節點: 控制節點(master node)和工做節點(worker node)。控制節點上面運行一個叫Nimbus後臺程序,它的做用相似Hadoop裏面的JobTracker。Nimbus負責在集羣裏面分發代碼,分配計算任務給機器, 而且監控狀態。 服務器
每個工做節點上面運行一個叫作Supervisor的節點。Supervisor會監聽分配給它那臺機器的工做,根據須要啓動/關閉工做進程。每個工做進程執行一個topology的一個子集;一個運行的topology由運行在不少機器上的不少工做進程組成。
架構
Nimbus和Supervisor之間的全部協調工做都是經過Zookeeper集羣完成。另外,Nimbus進程和Supervisor進程都是快速失敗(fail-fast)和無狀態的。全部的狀態要麼在zookeeper裏面, 要麼在本地磁盤上。這也就意味着你能夠用kill -9來殺死Nimbus和Supervisor進程, 而後再重啓它們,就好像什麼都沒有發生過。這個設計使得Storm異常的穩定。 併發
一個topology是spouts和bolts組成的圖, 經過stream groupings將圖中的spouts和bolts鏈接起來,以下圖: app
一個topology會一直運行直到你手動kill掉,Storm自動從新分配執行失敗的任務, 而且Storm能夠保證你不會有數據丟失(若是開啓了高可靠性的話)。若是一些機器意外停機它上面的全部任務會被轉移到其餘機器上。 maven
運行一個topology很簡單。首先,把你全部的代碼以及所依賴的jar打進一個jar包。而後運行相似下面的這個命令: 分佈式
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2 函數
這個命令會運行主類: backtype.strom.MyTopology, 參數是arg1, arg2。這個類的main函數定義這個topology而且把它提交給Nimbus。storm jar負責鏈接到Nimbus而且上傳jar包。 oop
Topology的定義是一個Thrift結構,而且Nimbus就是一個Thrift服務, 你能夠提交由任何語言建立的topology。上面的方面是用JVM-based語言提交的最簡單的方法。
消息流stream是storm裏的關鍵抽象。一個消息流是一個沒有邊界的tuple序列, 而這些tuple序列會以一種分佈式的方式並行地建立和處理。經過對stream中tuple序列中每一個字段命名來定義stream。在默認的狀況下,tuple的字段類型能夠是:integer,long,short, byte,string,double,float,boolean和byte array。你也能夠自定義類型(只要實現相應的序列化器)。
每一個消息流在定義的時候會被分配給一個id,由於單向消息流使用的至關廣泛, OutputFieldsDeclarer定義了一些方法讓你能夠定義一個stream而不用指定這個id。在這種狀況下這個stream會分配個值爲‘default’默認的id 。
Storm提供的最基本的處理stream的原語是spout和bolt。你能夠實現spout和bolt提供的接口來處理你的業務邏輯。
消息源spout是Storm裏面一個topology裏面的消息生產者。通常來講消息源會從一個外部源讀取數據而且向topology裏面發出消息:tuple。Spout能夠是可靠的也能夠是不可靠的。若是這個tuple沒有被storm成功處理,可靠的消息源spouts能夠從新發射一個tuple, 可是不可靠的消息源spouts一旦發出一個tuple就不能重發了。
消息源能夠發射多條消息流stream。使用OutputFieldsDeclarer.declareStream來定義多個stream,而後使用SpoutOutputCollector來發射指定的stream。
Spout類裏面最重要的方法是nextTuple。要麼發射一個新的tuple到topology裏面或者簡單的返回若是已經沒有新的tuple。要注意的是nextTuple方法不能阻塞,由於storm在同一個線程上面調用全部消息源spout的方法。
另外兩個比較重要的spout方法是ack和fail。storm在檢測到一個tuple被整個topology成功處理的時候調用ack,不然調用fail。storm只對可靠的spout調用ack和fail。
全部的消息處理邏輯被封裝在bolts裏面。Bolts能夠作不少事情:過濾,聚合,查詢數據庫等等。
Bolts能夠簡單的作消息流的傳遞。複雜的消息流處理每每須要不少步驟,從而也就須要通過不少bolts。好比算出一堆圖片裏面被轉發最多的圖片就至少須要兩步:第一步算出每一個圖片的轉發數量。第二步找出轉發最多的前10個圖片。(若是要把這個過程作得更具備擴展性那麼可能須要更多的步驟)。
Bolts能夠發射多條消息流, 使用OutputFieldsDeclarer.declareStream定義stream,使用OutputCollector.emit來選擇要發射的stream。
Bolts的主要方法是execute, 它以一個tuple做爲輸入,bolts使用OutputCollector來發射tuple,bolts必需要爲它處理的每個tuple調用OutputCollector的ack方法,以通知Storm這個tuple被處理完成了,從而通知這個tuple的發射者spouts。 通常的流程是: bolts處理一個輸入tuple, 發射0個或者多個tuple, 而後調用ack通知storm本身已經處理過這個tuple了。storm提供了一個IBasicBolt會自動調用ack。
定義一個topology的其中一步是定義每一個bolt接收什麼樣的流做爲輸入。stream grouping就是用來定義一個stream應該若是分配數據給bolts上面的多個tasks。
Storm裏面有7種類型的stream grouping
Storm保證每一個tuple會被topology完整的執行。Storm會追蹤由每一個spout tuple所產生的tuple樹(一個bolt處理一個tuple以後可能會發射別的tuple從而造成樹狀結構),而且跟蹤這棵tuple樹何時成功處理完。每一個topology都有一個消息超時的設置,若是storm在這個超時的時間內檢測不到某個tuple樹到底有沒有執行成功, 那麼topology會把這個tuple標記爲執行失敗,而且過一下子從新發射這個tuple。
爲了利用Storm的可靠性特性,在你發出一個新的tuple以及你完成處理一個tuple的時候你必需要通知storm。這一切是由OutputCollector來完成的。經過emit方法來通知一個新的tuple產生了,經過ack方法通知一個tuple處理完成了。
Storm的可靠性咱們在第四章會深刻介紹。
每個spout和bolt會被看成不少task在整個集羣裏執行。每個executor對應到一個線程,在這個線程上運行多個task,而stream grouping則是定義怎麼從一堆task發射tuple到另一堆task。你能夠調用TopologyBuilder類的setSpout和setBolt來設置並行度(也就是有多少個task)。
一個topology可能會在一個或者多個worker(工做進程)裏面執行,每一個worker是一個物理JVM而且執行整個topology的一部分。好比,對於並行度是300的topology來講,若是咱們使用50個工做進程來執行,那麼每一個工做進程會處理其中的6個tasks。Storm會盡可能均勻的工做分配給全部的worker。
Storm裏面有一堆參數能夠配置來調整Nimbus, Supervisor以及正在運行的topology的行爲,一些配置是系統級別的,一些配置是topology級別的。default.yaml裏面有全部的默認配置。你能夠經過定義個storm.yaml在你的classpath裏來覆蓋這些默認配置。而且你也能夠在代碼裏面設置一些topology相關的配置信息(使用StormSubmitter)。
咱們將設計一個topology,來實現對一個句子裏面的單詞出現的頻率進行統計。這是一個簡單的例子,目的是讓你們對於topology快速上手,有一個初步的理解。
在開始開發Storm項目的第一步,就是要設計topology。肯定好你的數據處理邏輯,咱們今天將的這個簡單的例子,topology也很是簡單。整個topology以下:
整個topology分爲三個部分:
KestrelSpout:數據源,負責發送sentence
Splitsentence:負責將sentence切分
Wordcount:負責對單詞的頻率進行累加
這個topology從kestrel queue讀取句子,並把句子劃分紅單詞,而後彙總每一個單詞出現的次數,一個tuple負責讀取句子,每個tuple分別對應計算每個單詞出現的次數,大概樣子以下所示:
1) 構建maven環境:
爲了開發storm topology, 你須要把storm相關的jar包添加到classpath裏面去: 要麼手動添加全部相關的jar包, 要麼使用maven來管理全部的依賴。storm的jar包發佈在Clojars(一個maven庫), 若是你使用maven的話,把下面的配置添加在你項目的pom.xml裏面。
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.5.3</version>
<scope>test</scope>
</dependency>
2) 定義topology:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(1, new KestrelSpout(「kestrel.backtype.com」,22133,
」sentence_queue」,
new StringScheme()));
builder.setBolt(2, new SplitSentence(), 10)
.shuffleGrouping(1);
builder.setBolt(3, new WordCount(), 20)
.fieldsGrouping(2, new Fields(「word」));
這種topology的spout從句子隊列中讀取句子,在kestrel.backtype.com位於一個Kestrel的服務器端口22133。
Spout用setSpout方法插入一個獨特的id到topology。 Topology中的每一個節點必須給予一個id,id是由其餘bolts用於訂閱該節點的輸出流。 KestrelSpout在topology中id爲1。
setBolt是用於在Topology中插入bolts。 在topology中定義的第一個bolts 是切割句子的bolts。 這個bolts 將句子流轉成成單詞流。
讓咱們看看SplitSentence實施:
public class SplitSentence implements IBasicBolt{
public void prepare(Map conf, TopologyContext context) {
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(「 」)) {
collector.emit(new Values(word));
}
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(「word」));
}
}
關鍵的方法是 execute方法。 正如你能夠看到,它將句子拆分紅單詞,併發出每一個單詞做爲一個新的元組。 另外一個重要的方法是declareOutputFields,其中宣佈bolts輸出元組的架構。 在這裏宣佈,它發出一個域爲word的元組
setBolt的最後一個參數是你想爲bolts的並行量。 SplitSentence bolts 是10個併發,這將致使在storm集羣中有十個線程並行執行。 你所要作的的是增長bolts的並行量在遇到topology的瓶頸時。
setBolt方法返回一個對象,用來定義bolts的輸入。 例如,SplitSentence螺栓訂閱組件「1」使用隨機分組的輸出流。 「1」是指已經定義KestrelSpout。 我將解釋在某一時刻的隨機分組的一部分。 到目前爲止,最要緊的是,SplitSentence bolts會消耗KestrelSpout發出的每個元組。
下面在讓咱們看看wordcount的實現:
public class WordCount implements IBasicBolt {
private Map<String, Integer> _counts = new HashMap<String, Integer>();
public void prepare(Map conf, TopologyContext context) {
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
int count;
if(_counts.containsKey(word)) {
count = _counts.get(word);
} else {
count = 0;
}
count++;
_counts.put(word, count);
collector.emit(new Values(word, count));
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(「word」, 「count」));
}
}
SplitSentence對於句子裏面的每一個單詞發射一個新的tuple, WordCount在內存裏面維護一個單詞->次數的mapping, WordCount每收到一個單詞, 它就更新內存裏面的統計狀態。
storm的運行有兩種模式: 本地模式和分佈式模式.
1) 本地模式:
storm用一個進程裏面的線程來模擬全部的spout和bolt. 本地模式對開發和測試來講比較有用。 你運行storm-starter裏面的topology的時候它們就是以本地模式運行的, 你能夠看到topology裏面的每個組件在發射什麼消息。
2) 分佈式模式:
storm由一堆機器組成。當你提交topology給master的時候, 你同時也把topology的代碼提交了。master負責分發你的代碼而且負責給你的topolgoy分配工做進程。若是一個工做進程掛掉了, master節點會把認爲從新分配到其它節點。
3) 下面是以本地模式運行的代碼:
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(「test」, conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology(「test」);
cluster.shutdown();
首先, 這個代碼定義經過定義一個LocalCluster對象來定義一個進程內的集羣。提交topology給這個虛擬的集羣和提交topology給分佈式集羣是同樣的。經過調用submitTopology方法來提交topology, 它接受三個參數:要運行的topology的名字,一個配置對象以及要運行的topology自己。
topology的名字是用來惟一區別一個topology的,這樣你而後能夠用這個名字來殺死這個topology的。前面已經說過了, 你必須顯式的殺掉一個topology, 不然它會一直運行。
Conf對象能夠配置不少東西, 下面兩個是最多見的:
TOPOLOGY_WORKERS(setNumWorkers) 定義你但願集羣分配多少個工做進程給你來執行這個topology. topology裏面的每一個組件會被須要線程來執行。每一個組件到底用多少個線程是經過setBolt和setSpout來指定的。這些線程都運行在工做進程裏面. 每個工做進程包含一些節點的一些工做線程。好比, 若是你指定300個線程,60個進程, 那麼每一個工做進程裏面要執行6個線程, 而這6個線程可能屬於不一樣的組件(Spout, Bolt)。你能夠經過調整每一個組件的並行度以及這些線程所在的進程數量來調整topology的性能。
TOPOLOGY_DEBUG(setDebug), 當它被設置成true的話, storm會記錄下每一個組件所發射的每條消息。這在本地環境調試topology頗有用, 可是在線上這麼作的話會影響性能的。
本章從storm的基本對象的定義,到普遍的介紹了storm的開發環境,從一個簡單的例子講解了topology的構建和定義。但願你們能夠從本章的內容對storm有一個基本的理解和概念,而且已經能夠構建一個簡單的topology!!