Storm運行 Zookeeper的配置中 Server.1=127.0.0.1:2888:3888 第一個端口是用於鏈接leader的端口,第二個端口是用於leader選舉的端口 集羣啓動時須要-server,本地啓動不須要。 Zookeeper是快速失敗的,遇到任何錯誤進程會退出,最好經過監控程序管理,保證退出後能自動重啓。 Zookeeper運行過程當中會在datadir目錄下生成不少日誌文件和快照文件,程序並不會按期清理,致使佔用大量磁盤空間,須要經過cron等方式清除沒用的日誌和快照文件。數據庫
Zeromq的使用配置(舊) Netty的使用配置(新)jvm
啓動storm topology Storm jar allmycode.jar org.me.mytopology arg1 arg2 arg3 經過jar包指定主類傳入參數執行 中止 storm kill {toponame}maven
Storm集羣運行的拓撲topology,會永遠運行,除非手動結束。 Hadoop集羣運行的mapreduce job,最終會結束。分佈式
控制節點和工做節點。 控制節點上運行nimbus程序,負責在集羣裏分發代碼,分配集羣任務給機器並監控狀態。 工做節點運行supervisor程序,監聽分配給本機器的工做,根據須要啓動/關閉工做進程。每個工做進程執行一個topology的子集,一個運行的topology由運行在不少機器上的不少工做進程組成。 Nimbus和supervisor之間的全部協調工做都是經過zookeeper集羣來完成。 Nimbus進程和supervisor進程都是快速失敗和無狀態的,全部的狀態要麼在zookeeper上要麼在本地磁盤上。意味着進程的重啓是無發生的,使storm很是穩定。函數
Topology Stream grouping 將spouts和bolts鏈接起來構成一個topology。Topology會一直運行,直到手動結束。 Storm自動從新分配執行失敗的任務,並保證不會有數據丟失。一些機器意外停機,它上面的全部任務都會被轉移到其餘機器上。oop
Topology運行時,須要打一個jar包,將代碼和依賴庫都打進去。而後聲明主類,傳入參數,執行。 Storm jar allmycode.jar org.me.mytopology arg1 arg2 arg3 這個主類的main函數定義topology,並提交給nimbus。Storm jar負責鏈接到nimbus並上傳jar包。 Topology的定義是一個thrift結構,且nimbus是一個thrift服務,能夠提交由任何語言建立的topology。Jvm-based語言提交的最簡單的方法。ui
Streams 消息流是storm裏的關鍵抽象,消息流是沒有邊界的tuple序列,這些tuple序列會以分佈式的方式並行地建立和處理。 經過對stream中tuple的每一個字段命名定義stream。字段類型豐富。 每一個消息流在定義時會分配一個id。單一消息流使用廣泛,outputfieldsdeclarer定義了方法能夠定義一個stream而不用指定這個id。默認id爲default。線程
Storm提供的最基本的處理stream原語是spout和bolt,能夠實現spout和bolt提供的接口來處理業務邏輯。設計
Spout 消息源spout是一個topology中的消息產生者。通常消息源會從外部源讀取數據而且向topology裏面發出消息tuple。Spout能夠是可靠的也能夠是不可靠的。若是這個tuple沒有被storm處理成功,可靠的消息源spout能夠重發tuple,但不可靠的消息源spout一旦發出一個tuple就不能重發了。 消息源能夠發射多條消息流stream。使用outputfielddeclarer.declareStream定義多個stream,而後使用spoutoutputcollector來發射指定的stream。 Spout類中最重要的方法是nextTuple。要麼發射一個新的tuple到topology裏去,要麼簡單的返回(沒有新tuple)。NextTuple方法不能阻塞,storm在同一個線程上面調用全部消息源spout的方法。日誌
兩個比較重要的方法是ack和fail。Storm檢測到一個tuple被整個topology成功處理的時候調用ack,不然調用fail。Storm只對可靠的spout調用ack和fail。
Bolts 全部消息處理邏輯都被封裝在bolts裏面,bokts能夠作不少事情,過濾,聚合,查詢數據庫等等。 Bolts爲消息流的傳遞。雁過拔毛。複雜的消息流處理每每須要不少步驟,從而通過不少bolts。 如算出一堆圖片中轉發最多的圖片須要兩步,第一步算出每一個圖片的轉發數量,第二步找出轉發最多的前10個圖片。 Bolts能夠發射多條消息流,使用outputfielddeclarer.declareStream定義多個stream,而後使用outputcollector.emit來選擇要發射的stream。
Bolts的主要方法是execute,它以一個tuple做爲輸入,bolts使用outputcollector來發射tuple,bolts必需要爲它處理的每個tuple調用outputcollector的ack方法,以通知storm這個tuple被處理完成了,從而通知這個tuple的發射者spouts。 通常的流程是:bolts處理一個輸入tuple,發射0個或多個tuple,而後調用ack通知storm本身已經處理過這個tuple了。Storm提供了一個IBasicBolt會自動調用ack。
Stream groupings 定義一個topology的其中一步是定義每一個bolt接收什麼樣的流做爲輸入。 Stream grouping就是用來定義一個stream應該如何分配數據給bolts上面的多個task的。 七種類型的stream grouping。 Shuffle grouping。隨機分組,保證每一個bolt接收到的tuple數目大體相同。 Fields grouping。按字段分組,好比按userid分組,相同userid的tuple會被分配到同一個bolts的同一個task中,而不一樣userid會分配到不一樣task中 All grouping。廣播發送,對於每個tuple,全部的bolt都會收到。 Global grouping。全局分組,這個tuple被分配到storm中一個bolt的其中一個task,分配給id值最小的那個task。 Non grouping。不分組。不關心誰會收到tuple,和隨機分組效果類似。只是storm會把這個bolt放到這個bolt的訂閱者的同一個線程去執行。 Direct grouping。直接分組,消息發送者指定由哪一個消息接收者的哪一個task處理這個消息。只有聲明爲direct stream的消息流可使用這個分組方法。Tuple必須使用emitdirect方法發射。 Local or shuffle grouping。若是目標bolt有一個或多個task在同一個工做進程中,tuple會隨機分配給這些tuple,不然和普通隨機分組行爲一致。
Reliability Storm保證每一個tuple會被topology完整執行。Storm會追蹤由每一個spout tuple所產生的tuple樹。一個bolt處理一個tuple以後可能會發射別的tuple從而造成樹狀結構,並跟蹤這個tuple樹何時成功處理完。每一個tuple都有一個消息超時的設置,若是storm在必定時間內檢測不到tuple樹到底沒有成功執行,那麼會把這個tuple標記爲執行失敗,並過一會重發這個tuple。 爲了利用storm的可靠性特性,在即將發出一個新的tuple以前,以及完成處理一個tuple以後,必需要通知到storm。一切由outputcollector來完成,主要是bolt發送tuple的。經過emit方法通知一個新的tuple產生,經過ack方法通知一個tuple處理完成了。
Task 每個spout和bolt會被當作不少task在整個集羣裏執行,每個executor對應到一個線程,在這個線程上運行多個task,而stream grouping則定義怎麼從一堆task發射tuple到另外一堆task。能夠調用topologybuilder類的setspout和setbolt來設置並行度。就是多少個task。
Workers 一個topology可能會在一個或多個worker裏執行,每一個worker是一個物理jvm而且執行整個topology的一部分。如對於並行度300的topology來講,使用50個工做進程執行,每一個工做進程會處理其中6個tasks。Storm會盡可能均勻的將工做分配給全部的worker。
構建topology 實現目標: 統計詞頻 設計topology結構: Topology分三部分,數據源負責發送sentence,將其切分,對單詞頻率增長。 設計數據流: 從隊列讀取句子,把句子劃分爲單詞,彙總單詞出現的次數,一個tuple負責讀取句子,每個tuple分別對應計算每個單詞出現的次數。 代碼實現: 導入storm自帶的examples,查看maven項目中的內容。